You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2017/10/04 22:37:53 UTC
[1/2] samza git commit: SAMZA-1056: Added wiring for High Level API
state stores, their serdes and changelogs.
Repository: samza
Updated Branches:
refs/heads/master ad80cf9f1 -> a671288e1
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
index 62304f3..7677826 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
@@ -54,30 +54,30 @@ public class TestTimeSeriesStoreImpl {
// read from time-range
List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 1L);
- Assert.assertEquals(values.size(), 0);
+ Assert.assertEquals(0, values.size());
// read from time-range [1,2) should return one entry
values = readStore(timeSeriesStore, "hello", 1L, 2L);
- Assert.assertEquals(values.size(), 1);
- Assert.assertEquals(new String(values.get(0).getValue()), "world-1");
+ Assert.assertEquals(1, values.size());
+ Assert.assertEquals("world-1", new String(values.get(0).getValue()));
// read from time-range [2,3) should return two entries
values = readStore(timeSeriesStore, "hello", 2L, 3L);
- Assert.assertEquals(values.size(), 2);
- Assert.assertEquals(new String(values.get(0).getValue()), "world-1");
- Assert.assertEquals(values.get(0).getTimestamp(), new Long(2));
+ Assert.assertEquals(2, values.size());
+ Assert.assertEquals("world-1", new String(values.get(0).getValue()));
+ Assert.assertEquals(2L, values.get(0).getTimestamp());
// read from time-range [0,3) should return three entries
values = readStore(timeSeriesStore, "hello", 0L, 3L);
- Assert.assertEquals(values.size(), 3);
+ Assert.assertEquals(3, values.size());
// read from time-range [2,999999) should return two entries
values = readStore(timeSeriesStore, "hello", 2L, 999999L);
- Assert.assertEquals(values.size(), 2);
+ Assert.assertEquals(2, values.size());
// read from time-range [3,4) should return no entries
values = readStore(timeSeriesStore, "hello", 3L, 4L);
- Assert.assertEquals(values.size(), 0);
+ Assert.assertEquals(0, values.size());
}
@Test
@@ -87,11 +87,11 @@ public class TestTimeSeriesStoreImpl {
// read from a non-existent key
List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "non-existent-key", 0, Integer.MAX_VALUE);
- Assert.assertEquals(values.size(), 0);
+ Assert.assertEquals(0, values.size());
// read from an existing key but out of range timestamp
values = readStore(timeSeriesStore, "hello", 2, Integer.MAX_VALUE);
- Assert.assertEquals(values.size(), 0);
+ Assert.assertEquals(0, values.size());
}
@Test
@@ -106,21 +106,21 @@ public class TestTimeSeriesStoreImpl {
// read from time-range [0,2) should return 100 entries
List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 2L);
- Assert.assertEquals(values.size(), 100);
+ Assert.assertEquals(100, values.size());
values.forEach(timeSeriesValue -> {
- Assert.assertEquals(new String(timeSeriesValue.getValue()), "world-1");
+ Assert.assertEquals("world-1", new String(timeSeriesValue.getValue()));
});
// read from time-range [2,4) should return 100 entries
values = readStore(timeSeriesStore, "hello", 2L, 4L);
- Assert.assertEquals(values.size(), 100);
+ Assert.assertEquals(100, values.size());
values.forEach(timeSeriesValue -> {
- Assert.assertEquals(new String(timeSeriesValue.getValue()), "world-2");
+ Assert.assertEquals("world-2", new String(timeSeriesValue.getValue()));
});
// read all entries in the store
values = readStore(timeSeriesStore, "hello", 0L, Integer.MAX_VALUE);
- Assert.assertEquals(values.size(), 200);
+ Assert.assertEquals(200, values.size());
}
@Test
@@ -135,30 +135,30 @@ public class TestTimeSeriesStoreImpl {
// read from time-range
List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 1L);
- Assert.assertEquals(values.size(), 0);
+ Assert.assertEquals(0, values.size());
// read from time-range [1,2) should return one entry
values = readStore(timeSeriesStore, "hello", 1L, 2L);
- Assert.assertEquals(values.size(), 1);
- Assert.assertEquals(new String(values.get(0).getValue()), "world-1");
+ Assert.assertEquals(1, values.size());
+ Assert.assertEquals("world-1", new String(values.get(0).getValue()));
// read from time-range [2,3) should return the most recent entry
values = readStore(timeSeriesStore, "hello", 2L, 3L);
- Assert.assertEquals(values.size(), 1);
- Assert.assertEquals(new String(values.get(0).getValue()), "world-2");
- Assert.assertEquals(values.get(0).getTimestamp(), new Long(2));
+ Assert.assertEquals(1, values.size());
+ Assert.assertEquals("world-2", new String(values.get(0).getValue()));
+ Assert.assertEquals(2L, values.get(0).getTimestamp());
// read from time-range [0,3) should return two entries
values = readStore(timeSeriesStore, "hello", 0L, 3L);
- Assert.assertEquals(values.size(), 2);
+ Assert.assertEquals(2, values.size());
// read from time-range [2,999999) should return one entry
values = readStore(timeSeriesStore, "hello", 2L, 999999L);
- Assert.assertEquals(values.size(), 1);
+ Assert.assertEquals(1, values.size());
// read from time-range [3,4) should return no entries
values = readStore(timeSeriesStore, "hello", 3L, 4L);
- Assert.assertEquals(values.size(), 0);
+ Assert.assertEquals(0, values.size());
}
@Test
@@ -172,11 +172,11 @@ public class TestTimeSeriesStoreImpl {
timeSeriesStore.put("hello", "world-2".getBytes(), 2L);
List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 1L, 3L);
- Assert.assertEquals(values.size(), 2);
+ Assert.assertEquals(2, values.size());
timeSeriesStore.remove("hello", 0L, 3L);
values = readStore(timeSeriesStore, "hello", 1L, 3L);
- Assert.assertEquals(values.size(), 0);
+ Assert.assertEquals(0, values.size());
}
private static <K, V> List<TimestampedValue<V>> readStore(TimeSeriesStore<K, V> store, K key, long startTimestamp, long endTimestamp) {
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java
new file mode 100644
index 0000000..40015ec
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl.store;
+
+import org.apache.samza.serializers.ByteSerde;
+import org.apache.samza.serializers.IntegerSerde;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+public class TestTimestampedValueSerde {
+
+ @Test
+ public void testEmptyValueDeserialization() {
+ byte[] bytesWithNoValue = new byte[8];
+ ByteBuffer.wrap(bytesWithNoValue).putLong(1234L);
+ TimestampedValueSerde<byte[]> timestampedValueSerde = new TimestampedValueSerde<>(new ByteSerde());
+ TimestampedValue<byte[]> timestampedValue = timestampedValueSerde.fromBytes(bytesWithNoValue);
+ assertEquals(1234L, timestampedValue.getTimestamp());
+ assertEquals(0, timestampedValue.getValue().length);
+ }
+
+ @Test
+ public void testEmptyValueSerialization() {
+ byte[] expectedBytes = new byte[8];
+ ByteBuffer.wrap(expectedBytes).putLong(1234L);
+
+ TimestampedValueSerde<Integer> timestampedValueSerde = new TimestampedValueSerde<>(new IntegerSerde());
+ TimestampedValue<Integer> timestampedValue = new TimestampedValue<>(null, 1234L);
+ assertTrue(Arrays.equals(expectedBytes, timestampedValueSerde.toBytes(timestampedValue)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java b/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java
deleted file mode 100644
index 2a8f039..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.test.operator;
-
-
-class PageView {
- private String userId;
- private String country;
- private String url;
-
- public String getUserId() {
- return userId;
- }
-
- public String getCountry() {
- return country;
- }
-
- public String getUrl() {
- return url;
- }
-
- public void setUserId(String userId) {
- this.userId = userId;
- }
-
- public void setCountry(String country) {
- this.country = country;
- }
-
- public void setUrl(String url) {
- this.url = url;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
new file mode 100644
index 0000000..517d81f
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.operator;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.test.operator.data.AdClick;
+import org.apache.samza.test.operator.data.PageView;
+import org.apache.samza.test.operator.data.UserPageAdClick;
+
+import java.time.Duration;
+
+/**
+ * A {@link StreamApplication} that demonstrates a partitionBy, stream-stream join and a windowed count.
+ */
+public class RepartitionJoinWindowApp implements StreamApplication {
+ static final String PAGE_VIEWS = "page-views";
+ static final String AD_CLICKS = "ad-clicks";
+ static final String OUTPUT_TOPIC = "user-ad-click-counts";
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ MessageStream<PageView> pageViews = graph.getInputStream(PAGE_VIEWS, new JsonSerdeV2<>(PageView.class));
+ MessageStream<AdClick> adClicks = graph.getInputStream(AD_CLICKS, new JsonSerdeV2<>(AdClick.class));
+ OutputStream<KV<String, String>> outputStream =
+ graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new StringSerde()));
+
+ MessageStream<PageView> pageViewsRepartitionedByViewId = pageViews
+ .partitionBy(PageView::getViewId, pv -> pv, new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(PageView.class)))
+ .map(KV::getValue);
+
+ MessageStream<AdClick> adClicksRepartitionedByViewId = adClicks
+ .partitionBy(AdClick::getViewId, ac -> ac, new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(AdClick.class)))
+ .map(KV::getValue);
+
+ MessageStream<UserPageAdClick> userPageAdClicks = pageViewsRepartitionedByViewId
+ .join(adClicksRepartitionedByViewId, new UserPageViewAdClicksJoiner(),
+ new StringSerde(), new JsonSerdeV2<>(PageView.class), new JsonSerdeV2<>(AdClick.class),
+ Duration.ofMinutes(1));
+
+ userPageAdClicks
+ .partitionBy(UserPageAdClick::getUserId, upac -> upac,
+ KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)))
+ .map(KV::getValue)
+ .window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, Duration.ofSeconds(3)))
+ .map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size())))
+ .sendTo(outputStream);
+ }
+
+ private static class UserPageViewAdClicksJoiner implements JoinFunction<String, PageView, AdClick, UserPageAdClick> {
+ @Override
+ public UserPageAdClick apply(PageView pv, AdClick ac) {
+ return new UserPageAdClick(pv.getUserId(), pv.getPageId(), ac.getAdId());
+ }
+
+ @Override
+ public String getFirstKey(PageView pv) {
+ return pv.getViewId();
+ }
+
+ @Override
+ public String getSecondKey(AdClick ac) {
+ return ac.getViewId();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
deleted file mode 100644
index 261b954..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.operator;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-
-import java.time.Duration;
-
-/**
- * A {@link StreamApplication} that demonstrates a partitionBy followed by a windowed count.
- */
-public class RepartitionWindowApp implements StreamApplication {
- static final String INPUT_TOPIC = "page-views";
- static final String OUTPUT_TOPIC = "page-view-counts";
-
- @Override
- public void init(StreamGraph graph, Config config) {
- MessageStream<PageView> pageViews = graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class));
-
- OutputStream<KV<String, String>> outputStream =
- graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new StringSerde()));
-
- pageViews
- .partitionBy(PageView::getUserId, pv -> pv, new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(PageView.class)))
- .window(Windows.keyedSessionWindow(KV::getKey, Duration.ofSeconds(3)))
- .map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size())))
- .sendTo(outputStream);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
index 4c83960..974cafc 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
@@ -30,6 +30,7 @@ import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.test.operator.data.PageView;
import java.time.Duration;
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
index 9bb66ad..db46982 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
@@ -213,7 +213,6 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
* @param overriddenConfigs configs to override
*/
public void runApplication(StreamApplication streamApplication, String appName, Config overriddenConfigs) {
-
Map<String, String> configs = new HashMap<>();
configs.put("job.factory.class", "org.apache.samza.job.local.ThreadJobFactory");
configs.put("job.name", appName);
@@ -231,6 +230,17 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
configs.put("job.coordinator.replication.factor", "1");
configs.put("task.window.ms", "1000");
+ // This is to prevent tests from taking a long time to stop after they're done. The issue is that
+ // tearDown currently doesn't call runner.kill(app), and shuts down the Kafka and ZK servers immediately.
+ // The test process then exits, triggering the SamzaContainer shutdown hook, which in turn tries to flush any
+ // store changelogs, which then get stuck trying to produce to the stopped Kafka server.
+ // Calling runner.kill doesn't work since RemoteApplicationRunner creates a new ThreadJob instance when
+ // kill is called. We can't use LocalApplicationRunner since ZkJobCoordinator doesn't currently create
+ // changelog streams. Hence we just force an unclean shutdown here to. This _should be_ OK
+ // since the test method has already executed by the time the shutdown hook is called. The side effect is
+ // that buffered state (e.g. changelog contents) might not be flushed correctly after the test run.
+ configs.put("task.shutdown.ms", "1");
+
if (overriddenConfigs != null) {
configs.putAll(overriddenConfigs);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
new file mode 100644
index 0000000..117f97b
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.operator;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.samza.test.operator.RepartitionJoinWindowApp.AD_CLICKS;
+import static org.apache.samza.test.operator.RepartitionJoinWindowApp.PAGE_VIEWS;
+import static org.apache.samza.test.operator.RepartitionJoinWindowApp.OUTPUT_TOPIC;
+
+/**
+ * Test driver for {@link RepartitionJoinWindowApp}.
+ */
+public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTestHarness {
+ private static final String APP_NAME = "UserPageAdClickCounter";
+
+ @Test
+ public void testRepartitionJoinWindowApp() throws Exception {
+ // create topics
+ createTopic(PAGE_VIEWS, 2);
+ createTopic(AD_CLICKS, 2);
+ createTopic(OUTPUT_TOPIC, 1);
+
+ // create events for the following user activity.
+ // userId: (viewId, pageId, (adIds))
+ // u1: (v1, p1, (a1, a2)), (v2, p2, (a3))
+ // u2: (v3, p1, (a1, a2, a4)), (v4, p3, (a5))
+ produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}");
+ produceMessage(PAGE_VIEWS, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}");
+ produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}");
+ produceMessage(PAGE_VIEWS, 1, "p3", "{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}");
+
+ produceMessage(AD_CLICKS, 0, "a1", "{\"viewId\":\"v1\",\"adId\":\"a1\"}");
+ produceMessage(AD_CLICKS, 1, "a2", "{\"viewId\":\"v1\",\"adId\":\"a2\"}");
+ produceMessage(AD_CLICKS, 0, "a3", "{\"viewId\":\"v2\",\"adId\":\"a3\"}");
+ produceMessage(AD_CLICKS, 0, "a1", "{\"viewId\":\"v3\",\"adId\":\"a1\"}");
+ produceMessage(AD_CLICKS, 1, "a2", "{\"viewId\":\"v3\",\"adId\":\"a2\"}");
+ produceMessage(AD_CLICKS, 1, "a4", "{\"viewId\":\"v3\",\"adId\":\"a4\"}");
+ produceMessage(AD_CLICKS, 0, "a5", "{\"viewId\":\"v4\",\"adId\":\"a5\"}");
+
+ // run the application
+ RepartitionJoinWindowApp app = new RepartitionJoinWindowApp();
+ runApplication(app, APP_NAME, null);
+
+ // consume and validate result
+ List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2);
+ Assert.assertEquals(2, messages.size());
+
+ for (ConsumerRecord<String, String> message : messages) {
+ String key = message.key();
+ String value = message.value();
+ Assert.assertTrue(key.equals("u1") || key.equals("u2"));
+ if ("u1".equals(key)) {
+ Assert.assertEquals("3", value);
+ } else {
+ Assert.assertEquals("4", value);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
deleted file mode 100644
index 3745541..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.test.operator;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.samza.test.operator.RepartitionWindowApp.INPUT_TOPIC;
-import static org.apache.samza.test.operator.RepartitionWindowApp.OUTPUT_TOPIC;
-
-/**
- * Test driver for {@link RepartitionWindowApp}.
- */
-public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHarness {
- private static final String APP_NAME = "RepartitionedSessionizer";
-
- @Test
- public void testRepartitionedSessionWindowCounter() throws Exception {
- // create topics
- createTopic(INPUT_TOPIC, 3);
- createTopic(OUTPUT_TOPIC, 1);
-
- // produce messages to different partitions.
- produceMessage(INPUT_TOPIC, 0, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"5.com\"}");
- produceMessage(INPUT_TOPIC, 1, "userId2", "{\"userId\":\"userId2\", \"country\":\"china\",\"url\":\"4.com\"}");
- produceMessage(INPUT_TOPIC, 2, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"1.com\"}");
- produceMessage(INPUT_TOPIC, 0, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"2.com\"}");
- produceMessage(INPUT_TOPIC, 1, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"3.com\"}");
-
- // run the application
- RepartitionWindowApp app = new RepartitionWindowApp();
- runApplication(app, APP_NAME, null);
-
- // consume and validate result
- List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2);
- Assert.assertEquals(messages.size(), 2);
-
- for (ConsumerRecord<String, String> message : messages) {
- String key = message.key();
- String value = message.value();
- // Assert that there are 4 messages for userId1 and 1 message for userId2.
- Assert.assertTrue(key.equals("userId1") || key.equals("userId2"));
- if ("userId1".equals(key)) {
- Assert.assertEquals("4", value);
- } else {
- Assert.assertEquals("1", value);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
index 3f3e615..151c9d1 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
@@ -30,6 +30,7 @@ import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.test.operator.data.PageView;
import java.time.Duration;
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/data/AdClick.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/data/AdClick.java b/samza-test/src/test/java/org/apache/samza/test/operator/data/AdClick.java
new file mode 100644
index 0000000..ee699ae
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/data/AdClick.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.operator.data;
+
+
+public class AdClick {
+ private String viewId;
+ private String adId;
+
+ public String getViewId() {
+ return viewId;
+ }
+
+ public void setViewId(String viewId) {
+ this.viewId = viewId;
+ }
+
+ public String getAdId() {
+ return adId;
+ }
+
+ public void setAdId(String adId) {
+ this.adId = adId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java b/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java
new file mode 100644
index 0000000..d2cebf9
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.operator.data;
+
+
+public class PageView {
+ private String viewId;
+ private String pageId;
+ private String userId;
+
+ public String getViewId() {
+ return viewId;
+ }
+
+ public void setViewId(String viewId) {
+ this.viewId = viewId;
+ }
+
+ public String getPageId() {
+ return pageId;
+ }
+
+ public void setPageId(String pageId) {
+ this.pageId = pageId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/data/UserPageAdClick.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/data/UserPageAdClick.java b/samza-test/src/test/java/org/apache/samza/test/operator/data/UserPageAdClick.java
new file mode 100644
index 0000000..e5f7b53
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/data/UserPageAdClick.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.operator.data;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+public class UserPageAdClick {
+ private String userId;
+ private String pageId;
+ private String adId;
+
+ @JsonCreator
+ public UserPageAdClick(
+ @JsonProperty("userId") String userId,
+ @JsonProperty("pageId") String pageId,
+ @JsonProperty("adId") String adId) {
+ this.userId = userId;
+ this.pageId = pageId;
+ this.adId = adId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+
+ public String getPageId() {
+ return pageId;
+ }
+
+ public void setPageId(String pageId) {
+ this.pageId = pageId;
+ }
+
+ public String getAdId() {
+ return adId;
+ }
+
+ public void setAdId(String adId) {
+ this.adId = adId;
+ }
+}
[2/2] samza git commit: SAMZA-1056: Added wiring for High Level API
state stores, their serdes and changelogs.
Posted by pm...@apache.org.
SAMZA-1056: Added wiring for High Level API state stores, their serdes and changelogs.
Provided join operator access to durable state stores.
Author: Prateek Maheshwari <pm...@linkedin.com>
Reviewers: Jagadish Venkatraman <ja...@apache.org>
Closes #309 from prateekm/operator-store-wiring
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a671288e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a671288e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a671288e
Branch: refs/heads/master
Commit: a671288e18dea3336fe1c625233bb7427ed8b5b2
Parents: ad80cf9
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Oct 4 15:37:50 2017 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Wed Oct 4 15:37:50 2017 -0700
----------------------------------------------------------------------
.gitignore | 1 +
.../apache/samza/operators/MessageStream.java | 13 ++-
.../operators/functions/WatermarkFunction.java | 31 +++---
.../apache/samza/container/TaskContextImpl.java | 6 +-
.../coordinator/DistributedLockWithState.java | 4 +-
.../org/apache/samza/execution/JobNode.java | 63 ++++++++---
.../samza/operators/MessageStreamImpl.java | 16 +--
.../functions/PartialJoinFunction.java | 26 +----
.../samza/operators/impl/OperatorImplGraph.java | 38 +++++--
.../operators/impl/PartialJoinOperatorImpl.java | 72 ++++--------
.../operators/impl/store/TimeSeriesStore.java | 1 +
.../operators/impl/store/TimestampedValue.java | 16 +--
.../impl/store/TimestampedValueSerde.java | 55 +++++++++
.../samza/operators/spec/JoinOperatorSpec.java | 70 +++++++++---
.../samza/operators/spec/OperatorSpecs.java | 17 ++-
.../operators/spec/StatefulOperatorSpec.java | 37 +++++++
.../samza/operators/spec/StoreDescriptor.java | 81 ++++++++++++++
.../samza/runtime/LocalApplicationRunner.java | 2 +-
.../org/apache/samza/zk/ZkJobCoordinator.java | 5 +-
.../apache/samza/container/SamzaContainer.scala | 8 +-
.../samza/example/OrderShipmentJoinExample.java | 4 +-
.../samza/execution/TestExecutionPlanner.java | 42 ++++---
.../execution/TestJobGraphJsonGenerator.java | 19 +++-
.../org/apache/samza/execution/TestJobNode.java | 111 ++++++++++++++-----
.../samza/operators/TestJoinOperator.java | 12 +-
.../samza/operators/TestMessageStreamImpl.java | 3 +-
.../operators/impl/TestOperatorImplGraph.java | 31 ++++--
.../impl/store/TestTimeSeriesStoreImpl.java | 54 ++++-----
.../impl/store/TestTimestampedValueSerde.java | 52 +++++++++
.../apache/samza/test/operator/PageView.java | 50 ---------
.../test/operator/RepartitionJoinWindowApp.java | 92 +++++++++++++++
.../test/operator/RepartitionWindowApp.java | 55 ---------
.../samza/test/operator/SessionWindowApp.java | 1 +
...StreamApplicationIntegrationTestHarness.java | 12 +-
.../operator/TestRepartitionJoinWindowApp.java | 81 ++++++++++++++
.../test/operator/TestRepartitionWindowApp.java | 70 ------------
.../samza/test/operator/TumblingWindowApp.java | 1 +
.../samza/test/operator/data/AdClick.java | 41 +++++++
.../samza/test/operator/data/PageView.java | 50 +++++++++
.../test/operator/data/UserPageAdClick.java | 62 +++++++++++
40 files changed, 974 insertions(+), 431 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 7cbffe7..c2e9180 100644
--- a/.gitignore
+++ b/.gitignore
@@ -22,6 +22,7 @@ docs/_site
build
**/bin
samza-test/state
+state/
docs/learn/documentation/*/api/javadocs
docs/learn/documentation/*/rest/javadocs
.DS_Store
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index 2a1045d..c36fe1f 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -27,6 +27,7 @@ import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
import java.time.Duration;
import java.util.ArrayList;
@@ -128,13 +129,17 @@ public interface MessageStream<M> {
* @param otherStream the other {@link MessageStream} to be joined with
* @param joinFn the function to join messages from this and the other {@link MessageStream}
* @param ttl the ttl for messages in each stream
+ * @param keySerde the serde for the join key
+ * @param messageSerde the serde for messages in this stream
+ * @param otherMessageSerde the serde for messages in the other stream
* @param <K> the type of join key
- * @param <JM> the type of messages in the other stream
- * @param <OM> the type of messages resulting from the {@code joinFn}
+ * @param <OM> the type of messages in the other stream
+ * @param <JM> the type of messages resulting from the {@code joinFn}
* @return the joined {@link MessageStream}
*/
- <K, JM, OM> MessageStream<OM> join(MessageStream<JM> otherStream,
- JoinFunction<? extends K, ? super M, ? super JM, ? extends OM> joinFn, Duration ttl);
+ <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
+ JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn,
+ Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, Duration ttl);
/**
* Merges all {@code otherStreams} with this {@link MessageStream}.
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
index 9c4b9bf..3be293e 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
@@ -20,14 +20,14 @@
package org.apache.samza.operators.functions;
/**
- * Allows user-specific handling of Watermark
+ * Allows handling of watermarks.
*/
public interface WatermarkFunction {
/**
* Processes the input watermark coming from upstream operators.
- * This allows user-defined watermark handling, such as trigger events
- * or propagate it to downstream.
+ * This allows custom watermark handling, such as triggering events or propagating it downstream.
+ *
* @param watermark input watermark
*/
void processWatermark(long watermark);
@@ -35,24 +35,19 @@ public interface WatermarkFunction {
/**
* Returns the output watermark. This function will be invoked immediately after either
* of the following events:
- *
* <ol>
- *
- * <li> Return of the transform function, e.g. {@link FlatMapFunction}.
- *
- * <li> Return of the processWatermark function.
- *
+ * <li> Return from the transform function, e.g. {@link FlatMapFunction}.
+ * <li> Return from the {@link #processWatermark} function.
* </ol>
+
+ * Note: If the transform function returns a collection of messages, the output watermark
+ * will be emitted after the output collection has been propagated to downstream operators.
+ * This might delay the watermark propagation, which will cause more buffering and might
+ * have a performance impact.
*
- *
- *
- * Note: If the transform function returns a collection of output, the output watermark
- * will be emitted after the output collection is propagated to downstream operators. So
- * it might delay the watermark propagation. The delay will cause more buffering and might
- * have performance impact.
- *
- * @return output watermark, or null if the output watermark should not be updated. Samza
- * guarantees that the same watermark value will be only emitted once.
+ * @return output watermark, or null if the output watermark should not be updated.
+ * Samza guarantees that the same watermark value will only be emitted once.
*/
Long getOutputWatermark();
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
index 7990d2b..aa622a3 100644
--- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
@@ -23,8 +23,8 @@ import com.google.common.collect.ImmutableSet;
import org.apache.samza.checkpoint.OffsetManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.ReadableMetricsRegistry;
-import org.apache.samza.storage.StorageEngine;
import org.apache.samza.storage.TaskStorageManager;
+import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.TaskContext;
@@ -79,9 +79,9 @@ public class TaskContextImpl implements TaskContext {
}
@Override
- public StorageEngine getStore(String storeName) {
+ public KeyValueStore getStore(String storeName) {
if (storageManager != null) {
- return storageManager.apply(storeName);
+ return (KeyValueStore) storageManager.apply(storeName);
} else {
LOG.warn("No store found for name: {}", storeName);
return null;
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
index 0de7813..c8e9033 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
@@ -26,12 +26,12 @@ import java.util.concurrent.TimeoutException;
public interface DistributedLockWithState {
/**
- * Trie to acquire the lock, but first check if the state flag is set. If it is set, return false.
+ * Try to acquire the lock, but first check if the state flag is set. If it is set, return false.
* If the flag is not set, and lock is acquired - return true.
- * Throw TimeOutException if could not acquire the lock.
* @param timeout Duration of lock acquiring timeout.
* @param unit Time Unit of the timeout defined above.
* @return true if lock is acquired successfully, false if state is already set.
+ * @throws TimeoutException if could not acquire the lock.
*/
boolean lockIfNotSet(long timeout, TimeUnit unit) throws TimeoutException;
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 0368829..2e89292 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -33,6 +33,7 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.operators.StreamGraphImpl;
@@ -40,6 +41,7 @@ import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.StatefulOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.util.MathUtils;
import org.apache.samza.serializers.Serde;
@@ -129,6 +131,14 @@ public class JobNode {
}
}
+ streamGraph.getAllOperatorSpecs().forEach(opSpec -> {
+ if (opSpec instanceof StatefulOperatorSpec) {
+ ((StatefulOperatorSpec) opSpec).getStoreDescriptors()
+ .forEach(sd -> configs.putAll(sd.getStorageConfigs()));
+ // store key and message serdes are configured separately in #addSerdeConfigs
+ }
+ });
+
configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
// write input/output streams to configs
@@ -168,32 +178,42 @@ public class JobNode {
*
* @param configs the configs to add serialized serde instances and stream serde configs to
*/
- protected void addSerdeConfigs(Map<String, String> configs) {
+ void addSerdeConfigs(Map<String, String> configs) {
// collect all key and msg serde instances for streams
- Map<String, Serde> keySerdes = new HashMap<>();
- Map<String, Serde> msgSerdes = new HashMap<>();
+ Map<String, Serde> streamKeySerdes = new HashMap<>();
+ Map<String, Serde> streamMsgSerdes = new HashMap<>();
Map<StreamSpec, InputOperatorSpec> inputOperators = streamGraph.getInputOperators();
inEdges.forEach(edge -> {
String streamId = edge.getStreamSpec().getId();
InputOperatorSpec inputOperatorSpec = inputOperators.get(edge.getStreamSpec());
- Serde keySerde = inputOperatorSpec.getKeySerde();
- Serde valueSerde = inputOperatorSpec.getValueSerde();
- keySerdes.put(streamId, keySerde);
- msgSerdes.put(streamId, valueSerde);
+ streamKeySerdes.put(streamId, inputOperatorSpec.getKeySerde());
+ streamMsgSerdes.put(streamId, inputOperatorSpec.getValueSerde());
});
Map<StreamSpec, OutputStreamImpl> outputStreams = streamGraph.getOutputStreams();
outEdges.forEach(edge -> {
String streamId = edge.getStreamSpec().getId();
OutputStreamImpl outputStream = outputStreams.get(edge.getStreamSpec());
- Serde keySerde = outputStream.getKeySerde();
- Serde valueSerde = outputStream.getValueSerde();
- keySerdes.put(streamId, keySerde);
- msgSerdes.put(streamId, valueSerde);
+ streamKeySerdes.put(streamId, outputStream.getKeySerde());
+ streamMsgSerdes.put(streamId, outputStream.getValueSerde());
+ });
+
+ // collect all key and msg serde instances for stores
+ Map<String, Serde> storeKeySerdes = new HashMap<>();
+ Map<String, Serde> storeMsgSerdes = new HashMap<>();
+ streamGraph.getAllOperatorSpecs().forEach(opSpec -> {
+ if (opSpec instanceof StatefulOperatorSpec) {
+ ((StatefulOperatorSpec) opSpec).getStoreDescriptors().forEach(storeDescriptor -> {
+ storeKeySerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getKeySerde());
+ storeMsgSerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getMsgSerde());
+ });
+ }
});
- // for each unique serde instance, generate a unique name and serialize to config
- HashSet<Serde> serdes = new HashSet<>(keySerdes.values());
- serdes.addAll(msgSerdes.values());
+ // for each unique stream or store serde instance, generate a unique name and serialize to config
+ HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values());
+ serdes.addAll(streamMsgSerdes.values());
+ serdes.addAll(storeKeySerdes.values());
+ serdes.addAll(storeMsgSerdes.values());
SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
Base64.Encoder base64Encoder = Base64.getEncoder();
Map<Serde, String> serdeUUIDs = new HashMap<>();
@@ -205,17 +225,28 @@ public class JobNode {
});
// set key and msg serdes for streams to the serde names generated above
- keySerdes.forEach((streamId, serde) -> {
+ streamKeySerdes.forEach((streamId, serde) -> {
String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId);
String keySerdeConfigKey = streamIdPrefix + StreamConfig.KEY_SERDE();
configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
});
- msgSerdes.forEach((streamId, serde) -> {
+ streamMsgSerdes.forEach((streamId, serde) -> {
String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId);
String valueSerdeConfigKey = streamIdPrefix + StreamConfig.MSG_SERDE();
configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
});
+
+ // set key and msg serdes for stores to the serde names generated above
+ storeKeySerdes.forEach((storeName, serde) -> {
+ String keySerdeConfigKey = String.format(StorageConfig.KEY_SERDE(), storeName);
+ configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
+ });
+
+ storeMsgSerdes.forEach((storeName, serde) -> {
+ String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE(), storeName);
+ configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde));
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 7b93a9e..8460ada 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -37,6 +37,7 @@ import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
import java.time.Duration;
import java.util.Collection;
@@ -112,15 +113,16 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
}
@Override
- public <K, JM, TM> MessageStream<TM> join(MessageStream<JM> otherStream,
- JoinFunction<? extends K, ? super M, ? super JM, ? extends TM> joinFn, Duration ttl) {
- OperatorSpec<?, JM> otherOpSpec = ((MessageStreamImpl<JM>) otherStream).getOperatorSpec();
- JoinOperatorSpec<K, M, JM, TM> joinOpSpec =
- OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec,
- (JoinFunction<K, M, JM, TM>) joinFn, ttl.toMillis(), this.graph.getNextOpId());
+ public <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
+ JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn,
+ Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, Duration ttl) {
+ OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec();
+ JoinOperatorSpec<K, M, OM, JM> joinOpSpec =
+ OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn,
+ keySerde, messageSerde, otherMessageSerde, ttl.toMillis(), this.graph.getNextOpId());
this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
- otherOpSpec.registerNextOperatorSpec((OperatorSpec<JM, ?>) joinOpSpec);
+ otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) joinOpSpec);
return new MessageStreamImpl<>(this.graph, joinOpSpec);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
index 9b7956a..5ede5e8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
@@ -18,21 +18,22 @@
*/
package org.apache.samza.operators.functions;
+import org.apache.samza.operators.impl.store.TimestampedValue;
import org.apache.samza.storage.kv.KeyValueStore;
/**
* An internal function that maintains state and join logic for one side of a two-way join.
*/
-public interface PartialJoinFunction<K, M, JM, RM> extends InitableFunction, ClosableFunction {
+public interface PartialJoinFunction<K, M, OM, JM> extends InitableFunction, ClosableFunction {
/**
* Joins a message in this stream with a message from another stream.
*
* @param m message from this input stream
- * @param jm message from the other input stream
+ * @param om message from the other input stream
* @return the joined message in the output stream
*/
- RM apply(M m, JM jm);
+ JM apply(M m, OM om);
/**
* Gets the key for the input message.
@@ -47,23 +48,6 @@ public interface PartialJoinFunction<K, M, JM, RM> extends InitableFunction, Clo
*
* @return the key value store containing the state for this stream
*/
- KeyValueStore<K, PartialJoinMessage<M>> getState();
+ KeyValueStore<K, TimestampedValue<M>> getState();
- class PartialJoinMessage<M> {
- private final M message;
- private final long receivedTimeMs;
-
- public PartialJoinMessage(M message, long receivedTimeMs) {
- this.message = message;
- this.receivedTimeMs = receivedTimeMs;
- }
-
- public M getMessage() {
- return message;
- }
-
- public long getReceivedTimeMs() {
- return receivedTimeMs;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 1f86975..808ddbf 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -37,7 +37,7 @@ import org.apache.samza.operators.spec.PartitionByOperatorSpec;
import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.util.InternalInMemoryStore;
+import org.apache.samza.operators.impl.store.TimestampedValue;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.TaskContext;
@@ -219,16 +219,17 @@ public class OperatorImplGraph {
private KV<PartialJoinFunction, PartialJoinFunction> getOrCreatePartialJoinFunctions(JoinOperatorSpec joinOpSpec) {
return joinFunctions.computeIfAbsent(joinOpSpec.getOpId(),
- joinOpId -> KV.of(createLeftJoinFn(joinOpSpec.getJoinFn()), createRightJoinFn(joinOpSpec.getJoinFn())));
+ joinOpId -> KV.of(createLeftJoinFn(joinOpSpec), createRightJoinFn(joinOpSpec)));
}
- private PartialJoinFunction<Object, Object, Object, Object> createLeftJoinFn(JoinFunction joinFn) {
+ private PartialJoinFunction<Object, Object, Object, Object> createLeftJoinFn(JoinOperatorSpec joinOpSpec) {
return new PartialJoinFunction<Object, Object, Object, Object>() {
- private KeyValueStore<Object, PartialJoinMessage<Object>> leftStreamState = new InternalInMemoryStore<>();
+ private final JoinFunction joinFn = joinOpSpec.getJoinFn();
+ private KeyValueStore<Object, TimestampedValue<Object>> leftStreamState;
@Override
- public Object apply(Object m, Object jm) {
- return joinFn.apply(m, jm);
+ public Object apply(Object m, Object om) {
+ return joinFn.apply(m, om);
}
@Override
@@ -237,12 +238,15 @@ public class OperatorImplGraph {
}
@Override
- public KeyValueStore<Object, PartialJoinMessage<Object>> getState() {
+ public KeyValueStore<Object, TimestampedValue<Object>> getState() {
return leftStreamState;
}
@Override
public void init(Config config, TaskContext context) {
+ String leftStoreName = joinOpSpec.getLeftOpName();
+ leftStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) context.getStore(leftStoreName);
+
// user-defined joinFn should only be initialized once, so we do it only in left partial join function.
joinFn.init(config, context);
}
@@ -255,13 +259,14 @@ public class OperatorImplGraph {
};
}
- private PartialJoinFunction<Object, Object, Object, Object> createRightJoinFn(JoinFunction joinFn) {
+ private PartialJoinFunction<Object, Object, Object, Object> createRightJoinFn(JoinOperatorSpec joinOpSpec) {
return new PartialJoinFunction<Object, Object, Object, Object>() {
- private KeyValueStore<Object, PartialJoinMessage<Object>> rightStreamState = new InternalInMemoryStore<>();
+ private final JoinFunction joinFn = joinOpSpec.getJoinFn();
+ private KeyValueStore<Object, TimestampedValue<Object>> rightStreamState;
@Override
- public Object apply(Object m, Object jm) {
- return joinFn.apply(jm, m);
+ public Object apply(Object m, Object om) {
+ return joinFn.apply(om, m);
}
@Override
@@ -270,7 +275,16 @@ public class OperatorImplGraph {
}
@Override
- public KeyValueStore<Object, PartialJoinMessage<Object>> getState() {
+ public void init(Config config, TaskContext context) {
+ String rightStoreName = joinOpSpec.getRightOpName();
+ rightStreamState = (KeyValueStore<Object, TimestampedValue<Object>>) context.getStore(rightStoreName);
+
+ // user-defined joinFn should only be initialized once,
+ // so we do it only in left partial join function and not here again.
+ }
+
+ @Override
+ public KeyValueStore<Object, TimestampedValue<Object>> getState() {
return rightStreamState;
}
};
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
index ad66962..e976a43 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -19,47 +19,39 @@
package org.apache.samza.operators.impl;
import org.apache.samza.config.Config;
-import org.apache.samza.metrics.Counter;
import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.operators.functions.PartialJoinFunction.PartialJoinMessage;
+import org.apache.samza.operators.impl.store.TimestampedValue;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueIterator;
-import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.util.Clock;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
/**
* Implementation of one side of a {@link JoinOperatorSpec} that buffers and joins its input messages of
- * type {@code M} with buffered input messages of type {@code JM} in the paired {@link PartialJoinOperatorImpl}.
+ * type {@code M} with buffered input messages of type {@code OM} in the paired {@link PartialJoinOperatorImpl}.
*
* @param <K> the type of join key
* @param <M> the type of input messages on this side of the join
- * @param <JM> the type of input message on the other side of the join
- * @param <RM> the type of join result
+ * @param <OM> the type of input message on the other side of the join
+ * @param <JM> the type of join result
*/
-class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
+class PartialJoinOperatorImpl<K, M, OM, JM> extends OperatorImpl<M, JM> {
- private final JoinOperatorSpec<K, M, JM, RM> joinOpSpec;
+ private final JoinOperatorSpec<K, M, OM, JM> joinOpSpec;
private final boolean isLeftSide; // whether this operator impl is for the left side of the join
- private final PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn;
- private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn;
+ private final PartialJoinFunction<K, M, OM, JM> thisPartialJoinFn;
+ private final PartialJoinFunction<K, OM, M, JM> otherPartialJoinFn;
private final long ttlMs;
private final Clock clock;
- private Counter keysRemoved;
-
- PartialJoinOperatorImpl(JoinOperatorSpec<K, M, JM, RM> joinOpSpec, boolean isLeftSide,
- PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn,
- PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn,
+ PartialJoinOperatorImpl(JoinOperatorSpec<K, M, OM, JM> joinOpSpec, boolean isLeftSide,
+ PartialJoinFunction<K, M, OM, JM> thisPartialJoinFn,
+ PartialJoinFunction<K, OM, M, JM> otherPartialJoinFn,
Config config, TaskContext context, Clock clock) {
this.joinOpSpec = joinOpSpec;
this.isLeftSide = isLeftSide;
@@ -71,54 +63,29 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
@Override
protected void handleInit(Config config, TaskContext context) {
- keysRemoved = context.getMetricsRegistry()
- .newCounter(OperatorImpl.class.getName(), getOperatorName() + "-keys-removed");
this.thisPartialJoinFn.init(config, context);
}
@Override
- public Collection<RM> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
+ public Collection<JM> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
K key = thisPartialJoinFn.getKey(message);
- thisPartialJoinFn.getState().put(key, new PartialJoinMessage<>(message, clock.currentTimeMillis()));
- PartialJoinMessage<JM> otherMessage = otherPartialJoinFn.getState().get(key);
+ thisPartialJoinFn.getState().put(key, new TimestampedValue<>(message, clock.currentTimeMillis()));
+ TimestampedValue<OM> otherMessage = otherPartialJoinFn.getState().get(key);
long now = clock.currentTimeMillis();
- if (otherMessage != null && otherMessage.getReceivedTimeMs() > now - ttlMs) {
- RM joinResult = thisPartialJoinFn.apply(message, otherMessage.getMessage());
+ if (otherMessage != null && otherMessage.getTimestamp() > now - ttlMs) {
+ JM joinResult = thisPartialJoinFn.apply(message, otherMessage.getValue());
return Collections.singletonList(joinResult);
}
return Collections.emptyList();
}
@Override
- public Collection<RM> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
- long now = clock.currentTimeMillis();
-
- KeyValueStore<K, PartialJoinMessage<M>> thisState = thisPartialJoinFn.getState();
- KeyValueIterator<K, PartialJoinMessage<M>> iterator = thisState.all();
- List<K> keysToRemove = new ArrayList<>();
-
- while (iterator.hasNext()) {
- Entry<K, PartialJoinMessage<M>> entry = iterator.next();
- if (entry.getValue().getReceivedTimeMs() < now - ttlMs) {
- keysToRemove.add(entry.getKey());
- } else {
- break; // InternalInMemoryStore uses a LinkedHashMap and will return entries in insertion order
- }
- }
-
- iterator.close();
- thisState.deleteAll(keysToRemove);
- keysRemoved.inc(keysToRemove.size());
- return Collections.emptyList();
- }
-
- @Override
protected void handleClose() {
this.thisPartialJoinFn.close();
}
- protected OperatorSpec<M, RM> getOperatorSpec() {
- return (OperatorSpec<M, RM>) joinOpSpec;
+ protected OperatorSpec<M, JM> getOperatorSpec() {
+ return (OperatorSpec<M, JM>) joinOpSpec;
}
/**
@@ -129,7 +96,6 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
*/
@Override
protected String getOperatorName() {
- String side = isLeftSide ? "L" : "R";
- return this.joinOpSpec.getOpName() + "-" + side;
+ return isLeftSide ? joinOpSpec.getLeftOpName() : joinOpSpec.getRightOpName();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
index e544e2e..56d839e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
@@ -52,6 +52,7 @@ public interface TimeSeriesStore<K, V> {
* @param key the key to look up in the store
* @param startTimestamp the start timestamp of the range, inclusive
* @param endTimestamp the end timestamp of the range, exclusive
+ * @return an iterator over the values for the given key in the provided time-range that must be closed after use
* @throws IllegalArgumentException when startTimeStamp > endTimestamp, or when either of them is negative
*/
ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, long endTimestamp);
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
index ad5e844..5e45148 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java
@@ -26,18 +26,18 @@ package org.apache.samza.operators.impl.store;
*/
public class TimestampedValue<V> {
private final V value;
- private final Long timestamp;
+ private final long timestamp;
- public TimestampedValue(V v, Long time) {
- value = v;
- timestamp = time;
+ public TimestampedValue(V v, long timestamp) {
+ this.value = v;
+ this.timestamp = timestamp;
}
public V getValue() {
return value;
}
- public Long getTimestamp() {
+ public long getTimestamp() {
return timestamp;
}
@@ -48,14 +48,14 @@ public class TimestampedValue<V> {
TimestampedValue<?> that = (TimestampedValue<?>) o;
- if (value != null ? !value.equals(that.value) : that.value != null) return false;
- return timestamp.equals(that.timestamp);
+ if (timestamp != that.timestamp) return false;
+ return value != null ? value.equals(that.value) : (that.value == null);
}
@Override
public int hashCode() {
int result = value != null ? value.hashCode() : 0;
- result = 31 * result + timestamp.hashCode();
+ result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
return result;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java
new file mode 100644
index 0000000..b14f8a4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl.store;
+
+import org.apache.samza.serializers.Serde;
+
+import java.nio.ByteBuffer;
+
+
+public class TimestampedValueSerde<V> implements Serde<TimestampedValue<V>> {
+ private static final int TIMESTAMP_BYTES = 8;
+ private final Serde<V> vSerde;
+
+ public TimestampedValueSerde(Serde<V> vSerde) {
+ this.vSerde = vSerde;
+ }
+
+ @Override
+ public TimestampedValue<V> fromBytes(byte[] bytes) {
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+ byte[] vBytes = new byte[bytes.length - TIMESTAMP_BYTES];
+ bb.get(vBytes, 0, vBytes.length);
+ V v = vSerde.fromBytes(vBytes);
+ long ts = bb.getLong();
+ return new TimestampedValue<>(v, ts);
+ }
+
+ @Override
+ public byte[] toBytes(TimestampedValue<V> tv) {
+ byte[] vBytes = vSerde.toBytes(tv.getValue());
+ int vBytesLength = vBytes != null ? vBytes.length : 0;
+ ByteBuffer bb = ByteBuffer.allocate(vBytesLength + TIMESTAMP_BYTES);
+ if (vBytes != null) {
+ bb.put(vBytes);
+ }
+ bb.putLong(tv.getTimestamp());
+ return bb.array();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
index f4fe0fd..3f99280 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
@@ -18,8 +18,16 @@
*/
package org.apache.samza.operators.spec;
+import com.google.common.collect.ImmutableMap;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.operators.impl.store.TimestampedValueSerde;
+import org.apache.samza.operators.impl.store.TimestampedValue;
+import org.apache.samza.serializers.Serde;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
/**
@@ -28,14 +36,17 @@ import org.apache.samza.operators.functions.WatermarkFunction;
*
* @param <K> the type of join key
* @param <M> the type of message in this stream
- * @param <JM> the type of message in the other stream
- * @param <RM> the type of join result
+ * @param <OM> the type of message in the other stream
+ * @param <JM> the type of join result
*/
-public class JoinOperatorSpec<K, M, JM, RM> extends OperatorSpec<Object, RM> { // Object == M | JM
+public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> implements StatefulOperatorSpec { // Object == M | OM
private final OperatorSpec<?, M> leftInputOpSpec;
- private final OperatorSpec<?, JM> rightInputOpSpec;
- private final JoinFunction<K, M, JM, RM> joinFn;
+ private final OperatorSpec<?, OM> rightInputOpSpec;
+ private final JoinFunction<K, M, OM, JM> joinFn;
+ private final Serde<K> keySerde;
+ private final Serde<TimestampedValue<M>> messageSerde;
+ private final Serde<TimestampedValue<OM>> otherMessageSerde;
private final long ttlMs;
/**
@@ -47,15 +58,45 @@ public class JoinOperatorSpec<K, M, JM, RM> extends OperatorSpec<Object, RM> { /
* @param ttlMs the ttl in ms for retaining messages in each stream
* @param opId the unique ID for this operator
*/
- JoinOperatorSpec(OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, JM> rightInputOpSpec,
- JoinFunction<K, M, JM, RM> joinFn, long ttlMs, int opId) {
+ JoinOperatorSpec(OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, OM> rightInputOpSpec,
+ JoinFunction<K, M, OM, JM> joinFn, Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde,
+ long ttlMs, int opId) {
super(OpCode.JOIN, opId);
this.leftInputOpSpec = leftInputOpSpec;
this.rightInputOpSpec = rightInputOpSpec;
this.joinFn = joinFn;
+ this.keySerde = keySerde;
+ this.messageSerde = new TimestampedValueSerde<>(messageSerde);
+ this.otherMessageSerde = new TimestampedValueSerde<>(otherMessageSerde);
this.ttlMs = ttlMs;
}
+ @Override
+ public Collection<StoreDescriptor> getStoreDescriptors() {
+ String rocksDBStoreFactory = "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory";
+ String leftStoreName = getLeftOpName();
+ String rightStoreName = getRightOpName();
+ Map<String, String> leftStoreCustomProps = ImmutableMap.of(
+ String.format("stores.%s.rocksdb.ttl.ms", leftStoreName), Long.toString(ttlMs),
+ String.format("stores.%s.changelog.kafka.cleanup.policy", leftStoreName), "delete",
+ String.format("stores.%s.changelog.kafka.retention.ms", leftStoreName), Long.toString(ttlMs));
+ Map<String, String> rightStoreCustomProps = ImmutableMap.of(
+ String.format("stores.%s.rocksdb.ttl.ms", rightStoreName), Long.toString(ttlMs),
+ String.format("stores.%s.changelog.kafka.cleanup.policy", rightStoreName), "delete",
+ String.format("stores.%s.changelog.kafka.retention.ms", rightStoreName), Long.toString(ttlMs));
+
+ return Arrays.asList(
+ new StoreDescriptor(leftStoreName, rocksDBStoreFactory, this.keySerde, this.messageSerde,
+ leftStoreName, leftStoreCustomProps),
+ new StoreDescriptor(rightStoreName, rocksDBStoreFactory, this.keySerde, this.otherMessageSerde,
+ rightStoreName, rightStoreCustomProps));
+ }
+
+ @Override
+ public WatermarkFunction getWatermarkFn() {
+ return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : null;
+ }
+
public OperatorSpec getLeftInputOpSpec() {
return leftInputOpSpec;
}
@@ -64,16 +105,19 @@ public class JoinOperatorSpec<K, M, JM, RM> extends OperatorSpec<Object, RM> { /
return rightInputOpSpec;
}
- public JoinFunction<K, M, JM, RM> getJoinFn() {
+ public String getLeftOpName() {
+ return this.getOpName() + "-L";
+ }
+
+ public String getRightOpName() {
+ return this.getOpName() + "-R";
+ }
+
+ public JoinFunction<K, M, OM, JM> getJoinFn() {
return this.joinFn;
}
public long getTtlMs() {
return ttlMs;
}
-
- @Override
- public WatermarkFunction getWatermarkFn() {
- return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : null;
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index e67179e..8b2b177 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -27,6 +27,7 @@ import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.serializers.Serde;
import org.apache.samza.task.TaskContext;
import java.util.ArrayList;
@@ -189,18 +190,22 @@ public class OperatorSpecs {
* @param leftInputOpSpec the operator spec for the stream on the left side of the join
* @param rightInputOpSpec the operator spec for the stream on the right side of the join
* @param joinFn the user-defined join function to get join keys and results
+ * @param keySerde the serde for the join key
+ * @param messageSerde the serde for messages in the stream on the lefta side of the join
+ * @param otherMessageSerde the serde for messages in the stream on the right side of the join
* @param ttlMs the ttl in ms for retaining messages in each stream
* @param opId the unique ID of the operator
* @param <K> the type of join key
* @param <M> the type of input message
- * @param <JM> the type of message in the other join stream
- * @param <RM> the type of join result
+ * @param <OM> the type of message in the other stream
+ * @param <JM> the type of join result
* @return the {@link JoinOperatorSpec}
*/
- public static <K, M, JM, RM> JoinOperatorSpec<K, M, JM, RM> createJoinOperatorSpec(
- OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, JM> rightInputOpSpec,
- JoinFunction<K, M, JM, RM> joinFn, long ttlMs, int opId) {
- return new JoinOperatorSpec<>(leftInputOpSpec, rightInputOpSpec, joinFn, ttlMs, opId);
+ public static <K, M, OM, JM> JoinOperatorSpec<K, M, OM, JM> createJoinOperatorSpec(
+ OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, OM> rightInputOpSpec, JoinFunction<K, M, OM, JM> joinFn,
+ Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, long ttlMs, int opId) {
+ return new JoinOperatorSpec<>(leftInputOpSpec, rightInputOpSpec, joinFn,
+ keySerde, messageSerde, otherMessageSerde, ttlMs, opId);
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/spec/StatefulOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StatefulOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StatefulOperatorSpec.java
new file mode 100644
index 0000000..90dfe59
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StatefulOperatorSpec.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.spec;
+
+import java.util.Collection;
+
+
+/**
+ * Spec for stateful operators.
+ */
+public interface StatefulOperatorSpec {
+
+ /**
+ * Get the store descriptors for stores required by this operator.
+ *
+ * @return store descriptors for this operator's stores
+ */
+ Collection<StoreDescriptor> getStoreDescriptors();
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java
new file mode 100644
index 0000000..8aa2dd9
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.config.JavaStorageConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.serializers.Serde;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A descriptor for a store.
+ */
+public class StoreDescriptor {
+ private final String storeName;
+ private final String storeFactory;
+ private final Serde keySerde;
+ private final Serde msgSerde;
+ private final String changelogStream;
+ private final Map<String, String> otherProperties;
+
+ StoreDescriptor(String storeName, String storeFactory, Serde keySerde, Serde msgSerde,
+ String changelogStream, Map<String, String> otherProperties) {
+ this.storeName = storeName;
+ this.storeFactory = storeFactory;
+ this.keySerde = keySerde;
+ this.msgSerde = msgSerde;
+ this.changelogStream = changelogStream;
+ this.otherProperties = otherProperties;
+ }
+
+ public String getStoreName() {
+ return storeName;
+ }
+
+ public Serde getKeySerde() {
+ return keySerde;
+ }
+
+ public Serde getMsgSerde() {
+ return msgSerde;
+ }
+
+ public JavaStorageConfig getStorageConfigs() {
+ HashMap<String, String> configs = new HashMap<>();
+ configs.put(String.format(StorageConfig.FACTORY(), this.getStoreName()), this.getStoreFactory());
+ configs.put(String.format(StorageConfig.CHANGELOG_STREAM(), this.getStoreName()), this.getChangelogStream());
+ configs.putAll(this.getOtherProperties());
+ return new JavaStorageConfig(new MapConfig(configs));
+ }
+
+ private String getStoreFactory() {
+ return storeFactory;
+ }
+
+ private String getChangelogStream() {
+ return changelogStream;
+ }
+
+ private Map<String, String> getOtherProperties() {
+ return otherProperties;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 1caca26..ff0299d 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -154,7 +154,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
writePlanJsonFile(executionPlanJson);
// 2. create the necessary streams
- // TODO: System generated intermediate streams should have robust naming scheme. Refer JIRA-1391
+ // TODO: System generated intermediate streams should have robust naming scheme. See SAMZA-1391
String planId = String.valueOf(executionPlanJson.hashCode());
createStreams(planId, plan.getIntermediateStreams());
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 4c4a645..9d44ec1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -216,13 +216,14 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
public void onNewJobModelAvailable(final String version) {
debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () ->
{
- LOG.info("pid=" + processorId + "new JobModel available");
+ LOG.info("pid=" + processorId + ": new JobModel available");
// get the new job model from ZK
newJobModel = zkUtils.getJobModel(version);
LOG.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel);
if (!newJobModel.getContainers().containsKey(processorId)) {
- LOG.info("JobModel: {} does not contain the processorId: {}. Stopping the processor.", newJobModel, processorId);
+ LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}",
+ processorId, newJobModel);
stop();
} else {
// stop current work
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index c3e1be2..6071c1f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -805,7 +805,7 @@ class SamzaContainer(
*/
def shutdown(): Unit = {
if (status == SamzaContainerStatus.STOPPED || status == SamzaContainerStatus.FAILED) {
- throw new IllegalContainerStateException("Cannot shutdown a container with status - " + status)
+ throw new IllegalContainerStateException("Cannot shutdown a container with status " + status)
}
shutdownRunLoop()
}
@@ -936,16 +936,18 @@ class SamzaContainer(
val runLoopThread = Thread.currentThread()
shutdownHookThread = new Thread("CONTAINER-SHUTDOWN-HOOK") {
override def run() = {
- info("Shutting down, will wait up to %s ms" format shutdownMs)
+ info("Shutting down, will wait up to %s ms." format shutdownMs)
shutdownRunLoop() //TODO: Pull out shutdown hook to LocalContainerRunner or SP
try {
runLoopThread.join(shutdownMs)
} catch {
case e: Throwable => // Ignore to avoid deadlock with uncaughtExceptionHandler. See SAMZA-1220
- error("Did not shut down within %s ms, exiting" format shutdownMs, e)
+ error("Did not shut down within %s ms, exiting." format shutdownMs, e)
}
if (!runLoopThread.isAlive) {
info("Shutdown complete")
+ } else {
+ error("Did not shut down within %s ms, exiting." format shutdownMs)
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
index 95939c4..df393b0 100644
--- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -49,7 +49,9 @@ public class OrderShipmentJoinExample implements StreamApplication {
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class)));
orders
- .join(shipments, new MyJoinFunction(), Duration.ofMinutes(1))
+ .join(shipments, new MyJoinFunction(),
+ new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class),
+ Duration.ofMinutes(1))
.map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
.sendTo(fulfilledOrders);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 50b0a13..f6441dc 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -31,6 +31,7 @@ import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.Serde;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
@@ -130,14 +131,14 @@ public class TestExecutionPlanner {
*/
StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
- MessageStream<KV<Object, Object>> m1 =
+ MessageStream<KV<Object, Object>> messageStream1 =
streamGraph.<KV<Object, Object>>getInputStream("input1")
.map(m -> m);
- MessageStream<KV<Object, Object>> m2 =
+ MessageStream<KV<Object, Object>> messageStream2 =
streamGraph.<KV<Object, Object>>getInputStream("input2")
.partitionBy(m -> m.key, m -> m.value)
.filter(m -> true);
- MessageStream<KV<Object, Object>> m3 =
+ MessageStream<KV<Object, Object>> messageStream3 =
streamGraph.<KV<Object, Object>>getInputStream("input3")
.filter(m -> true)
.partitionBy(m -> m.key, m -> m.value)
@@ -145,8 +146,14 @@ public class TestExecutionPlanner {
OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2");
- m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(output1);
- m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(output2);
+ messageStream1
+ .join(messageStream2, mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2))
+ .sendTo(output1);
+ messageStream3
+ .join(messageStream2, mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1))
+ .sendTo(output2);
return streamGraph;
}
@@ -154,14 +161,14 @@ public class TestExecutionPlanner {
private StreamGraphImpl createStreamGraphWithJoinAndWindow() {
StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
- MessageStream<KV<Object, Object>> m1 =
+ MessageStream<KV<Object, Object>> messageStream1 =
streamGraph.<KV<Object, Object>>getInputStream("input1")
.map(m -> m);
- MessageStream<KV<Object, Object>> m2 =
+ MessageStream<KV<Object, Object>> messageStream2 =
streamGraph.<KV<Object, Object>>getInputStream("input2")
.partitionBy(m -> m.key, m -> m.value)
.filter(m -> true);
- MessageStream<KV<Object, Object>> m3 =
+ MessageStream<KV<Object, Object>> messageStream3 =
streamGraph.<KV<Object, Object>>getInputStream("input3")
.filter(m -> true)
.partitionBy(m -> m.key, m -> m.value)
@@ -169,17 +176,26 @@ public class TestExecutionPlanner {
OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2");
- m1.map(m -> m)
+ messageStream1.map(m -> m)
.filter(m->true)
.window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8)));
- m2.map(m -> m)
+ messageStream2.map(m -> m)
.filter(m->true)
.window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16)));
- m1.join(m2, mock(JoinFunction.class), Duration.ofMillis(1600)).sendTo(output1);
- m3.join(m2, mock(JoinFunction.class), Duration.ofMillis(100)).sendTo(output2);
- m3.join(m2, mock(JoinFunction.class), Duration.ofMillis(252)).sendTo(output2);
+ messageStream1
+ .join(messageStream2, mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600))
+ .sendTo(output1);
+ messageStream3
+ .join(messageStream2, mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100))
+ .sendTo(output2);
+ messageStream3
+ .join(messageStream2, mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252))
+ .sendTo(output2);
return streamGraph;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index 095e407..10c4aa3 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -30,6 +30,7 @@ import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.codehaus.jackson.map.ObjectMapper;
@@ -107,14 +108,14 @@ public class TestJobGraphJsonGenerator {
StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
streamGraph.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
- MessageStream<KV<Object, Object>> m1 =
+ MessageStream<KV<Object, Object>> messageStream1 =
streamGraph.<KV<Object, Object>>getInputStream("input1")
.map(m -> m);
- MessageStream<KV<Object, Object>> m2 =
+ MessageStream<KV<Object, Object>> messageStream2 =
streamGraph.<KV<Object, Object>>getInputStream("input2")
.partitionBy(m -> m.key, m -> m.value)
.filter(m -> true);
- MessageStream<KV<Object, Object>> m3 =
+ MessageStream<KV<Object, Object>> messageStream3 =
streamGraph.<KV<Object, Object>>getInputStream("input3")
.filter(m -> true)
.partitionBy(m -> m.key, m -> m.value)
@@ -122,9 +123,15 @@ public class TestJobGraphJsonGenerator {
OutputStream<KV<Object, Object>> outputStream1 = streamGraph.getOutputStream("output1");
OutputStream<KV<Object, Object>> outputStream2 = streamGraph.getOutputStream("output2");
- m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(outputStream1);
- m2.sink((message, collector, coordinator) -> { });
- m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(outputStream2);
+ messageStream1
+ .join(messageStream2, mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2))
+ .sendTo(outputStream1);
+ messageStream2.sink((message, collector, coordinator) -> { });
+ messageStream3
+ .join(messageStream2, mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1))
+ .sendTo(outputStream2);
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
ExecutionPlan plan = planner.plan(streamGraph);
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
index 918da26..f6ebaf9 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
@@ -26,6 +26,8 @@ import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.impl.store.TimestampedValueSerde;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
@@ -35,6 +37,7 @@ import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.StreamSpec;
import org.junit.Test;
+import java.time.Duration;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
@@ -50,25 +53,35 @@ public class TestJobNode {
@Test
public void testAddSerdeConfigs() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- StreamSpec inputSpec = new StreamSpec("input", "input", "input-system");
+ StreamSpec input1Spec = new StreamSpec("input1", "input1", "input-system");
+ StreamSpec input2Spec = new StreamSpec("input2", "input2", "input-system");
StreamSpec outputSpec = new StreamSpec("output", "output", "output-system");
- StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-1", "partition_by-1", "intermediate-system");
- doReturn(inputSpec).when(mockRunner).getStreamSpec("input");
+ StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-2", "partition_by-2", "intermediate-system");
+ doReturn(input1Spec).when(mockRunner).getStreamSpec("input1");
+ doReturn(input2Spec).when(mockRunner).getStreamSpec("input2");
doReturn(outputSpec).when(mockRunner).getStreamSpec("output");
- doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-1");
+ doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-2");
StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
streamGraph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
- MessageStream<KV<String, Object>> input = streamGraph.getInputStream("input");
+ MessageStream<KV<String, Object>> input1 = streamGraph.getInputStream("input1");
+ MessageStream<KV<String, Object>> input2 = streamGraph.getInputStream("input2");
OutputStream<KV<String, Object>> output = streamGraph.getOutputStream("output");
- input.partitionBy(KV::getKey, KV::getValue).sendTo(output);
+ JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = mock(JoinFunction.class);
+ input1
+ .partitionBy(KV::getKey, KV::getValue).map(kv -> kv.value)
+ .join(input2.map(kv -> kv.value), mockJoinFn,
+ new StringSerde(), new JsonSerdeV2<>(Object.class), new JsonSerdeV2<>(Object.class), Duration.ofHours(1))
+ .sendTo(output);
JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mock(Config.class));
Config config = new MapConfig();
- StreamEdge inputEdge = new StreamEdge(inputSpec, config);
+ StreamEdge input1Edge = new StreamEdge(input1Spec, config);
+ StreamEdge input2Edge = new StreamEdge(input2Spec, config);
StreamEdge outputEdge = new StreamEdge(outputSpec, config);
StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true, config);
- jobNode.addInEdge(inputEdge);
+ jobNode.addInEdge(input1Edge);
+ jobNode.addInEdge(input2Edge);
jobNode.addOutEdge(outputEdge);
jobNode.addInEdge(repartitionEdge);
jobNode.addOutEdge(repartitionEdge);
@@ -85,28 +98,74 @@ public class TestJobNode {
e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), ""),
e -> serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes()))
));
- assertEquals(2, serializers.size());
+ assertEquals(5, serializers.size()); // 2 default + 3 specific for join
- String inputKeySerde = mapConfig.get("streams.input.samza.key.serde");
- String inputMsgSerde = mapConfig.get("streams.input.samza.msg.serde");
- assertTrue(deserializedSerdes.containsKey(inputKeySerde));
- assertTrue(inputKeySerde.startsWith(StringSerde.class.getSimpleName()));
- assertTrue(deserializedSerdes.containsKey(inputMsgSerde));
- assertTrue(inputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+ String input1KeySerde = mapConfig.get("streams.input1.samza.key.serde");
+ String input1MsgSerde = mapConfig.get("streams.input1.samza.msg.serde");
+ assertTrue("Serialized serdes should contain input1 key serde",
+ deserializedSerdes.containsKey(input1KeySerde));
+ assertTrue("Serialized input1 key serde should be a StringSerde",
+ input1KeySerde.startsWith(StringSerde.class.getSimpleName()));
+ assertTrue("Serialized serdes should contain input1 msg serde",
+ deserializedSerdes.containsKey(input1MsgSerde));
+ assertTrue("Serialized input1 msg serde should be a JsonSerdeV2",
+ input1MsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+
+ String input2KeySerde = mapConfig.get("streams.input2.samza.key.serde");
+ String input2MsgSerde = mapConfig.get("streams.input2.samza.msg.serde");
+ assertTrue("Serialized serdes should contain input2 key serde",
+ deserializedSerdes.containsKey(input2KeySerde));
+ assertTrue("Serialized input2 key serde should be a StringSerde",
+ input2KeySerde.startsWith(StringSerde.class.getSimpleName()));
+ assertTrue("Serialized serdes should contain input2 msg serde",
+ deserializedSerdes.containsKey(input2MsgSerde));
+ assertTrue("Serialized input2 msg serde should be a JsonSerdeV2",
+ input2MsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
String outputKeySerde = mapConfig.get("streams.output.samza.key.serde");
String outputMsgSerde = mapConfig.get("streams.output.samza.msg.serde");
- assertTrue(deserializedSerdes.containsKey(outputKeySerde));
- assertTrue(outputKeySerde.startsWith(StringSerde.class.getSimpleName()));
- assertTrue(deserializedSerdes.containsKey(outputMsgSerde));
- assertTrue(outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
-
- String partitionByKeySerde = mapConfig.get("streams.null-null-partition_by-1.samza.key.serde");
- String partitionByMsgSerde = mapConfig.get("streams.null-null-partition_by-1.samza.msg.serde");
- assertTrue(deserializedSerdes.containsKey(partitionByKeySerde));
- assertTrue(partitionByKeySerde.startsWith(StringSerde.class.getSimpleName()));
- assertTrue(deserializedSerdes.containsKey(partitionByMsgSerde));
- assertTrue(partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+ assertTrue("Serialized serdes should contain output key serde",
+ deserializedSerdes.containsKey(outputKeySerde));
+ assertTrue("Serialized output key serde should be a StringSerde",
+ outputKeySerde.startsWith(StringSerde.class.getSimpleName()));
+ assertTrue("Serialized serdes should contain output msg serde",
+ deserializedSerdes.containsKey(outputMsgSerde));
+ assertTrue("Serialized output msg serde should be a StringSerde",
+ outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+
+ String partitionByKeySerde = mapConfig.get("streams.null-null-partition_by-2.samza.key.serde");
+ String partitionByMsgSerde = mapConfig.get("streams.null-null-partition_by-2.samza.msg.serde");
+ assertTrue("Serialized serdes should contain intermediate stream key serde",
+ deserializedSerdes.containsKey(partitionByKeySerde));
+ assertTrue("Serialized intermediate stream key serde should be a StringSerde",
+ partitionByKeySerde.startsWith(StringSerde.class.getSimpleName()));
+ assertTrue("Serialized serdes should contain intermediate stream msg serde",
+ deserializedSerdes.containsKey(partitionByMsgSerde));
+ assertTrue(
+ "Serialized intermediate stream msg serde should be a StringSerde",
+ partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+
+ String leftJoinStoreKeySerde = mapConfig.get("stores.join-6-L.key.serde");
+ String leftJoinStoreMsgSerde = mapConfig.get("stores.join-6-L.msg.serde");
+ assertTrue("Serialized serdes should contain left join store key serde",
+ deserializedSerdes.containsKey(leftJoinStoreKeySerde));
+ assertTrue("Serialized left join store key serde should be a StringSerde",
+ leftJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName()));
+ assertTrue("Serialized serdes should contain left join store msg serde",
+ deserializedSerdes.containsKey(leftJoinStoreMsgSerde));
+ assertTrue("Serialized left join store msg serde should be a TimestampedValueSerde",
+ leftJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName()));
+
+ String rightJoinStoreKeySerde = mapConfig.get("stores.join-6-R.key.serde");
+ String rightJoinStoreMsgSerde = mapConfig.get("stores.join-6-R.msg.serde");
+ assertTrue("Serialized serdes should contain right join store key serde",
+ deserializedSerdes.containsKey(rightJoinStoreKeySerde));
+ assertTrue("Serialized right join store key serde should be a StringSerde",
+ rightJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName()));
+ assertTrue("Serialized serdes should contain right join store msg serde",
+ deserializedSerdes.containsKey(rightJoinStoreMsgSerde));
+ assertTrue("Serialized right join store msg serde should be a TimestampedValueSerde",
+ rightJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName()));
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 0df6721..1120c25 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -22,10 +22,13 @@ import com.google.common.collect.ImmutableSet;
import org.apache.samza.Partition;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskContextImpl;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.util.InternalInMemoryStore;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -34,7 +37,6 @@ import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamOperatorTask;
-import org.apache.samza.container.TaskContextImpl;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.testUtils.TestClock;
@@ -49,6 +51,7 @@ import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -261,6 +264,9 @@ public class TestJoinOperator {
.of(new SystemStreamPartition("insystem", "instream", new Partition(0)),
new SystemStreamPartition("insystem2", "instream2", new Partition(0))));
when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ // need to return different stores for left and right side
+ when(taskContext.getStore(eq("join-4-L"))).thenReturn(new InternalInMemoryStore<>());
+ when(taskContext.getStore(eq("join-4-R"))).thenReturn(new InternalInMemoryStore<>());
Config config = mock(Config.class);
@@ -287,7 +293,9 @@ public class TestJoinOperator {
SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
inStream
- .join(inStream2, joinFn, JOIN_TTL)
+ .join(inStream2, joinFn,
+ new IntegerSerde(), new JsonSerdeV2<>(FirstStreamIME.class), new JsonSerdeV2<>(SecondStreamIME.class),
+ JOIN_TTL)
.sink((message, messageCollector, taskCoordinator) -> {
messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
});
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index c6554bc..f23bb14 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -41,6 +41,7 @@ import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -288,7 +289,7 @@ public class TestMessageStreamImpl {
mock(JoinFunction.class);
Duration joinTtl = Duration.ofMinutes(1);
- source1.join(source2, mockJoinFn, joinTtl);
+ source1.join(source2, mockJoinFn, mock(Serde.class), mock(Serde.class), mock(Serde.class), joinTtl);
ArgumentCaptor<OperatorSpec> leftRegisteredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
verify(leftInputOpSpec).registerNextOperatorSpec(leftRegisteredOpCaptor.capture());
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index d73c545..a759e52 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -44,6 +44,7 @@ import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.util.InternalInMemoryStore;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
@@ -218,10 +219,13 @@ public class TestOperatorImplGraph {
JoinFunction mockJoinFunction = mock(JoinFunction.class);
MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", new NoOpSerde<>());
MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", new NoOpSerde<>());
- inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1));
+ inputStream1.join(inputStream2, mockJoinFunction,
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1));
TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ when(mockTaskContext.getStore(eq("join-2-L"))).thenReturn(new InternalInMemoryStore<>());
+ when(mockTaskContext.getStore(eq("join-2-R"))).thenReturn(new InternalInMemoryStore<>());
OperatorImplGraph opImplGraph =
new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
@@ -386,15 +390,22 @@ public class TestOperatorImplGraph {
.thenReturn(int2);
StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
- Serde inputSerde = new NoOpSerde<>();
- MessageStream m1 = streamGraph.getInputStream("input1", inputSerde).map(m -> m);
- MessageStream m2 = streamGraph.getInputStream("input2", inputSerde).filter(m -> true);
- MessageStream m3 = streamGraph.getInputStream("input3", inputSerde).filter(m -> true).partitionBy(m -> "hehe", m -> m).map(m -> m);
- OutputStream<Object> om1 = streamGraph.getOutputStream("output1");
- OutputStream<Object> om2 = streamGraph.getOutputStream("output2");
-
- m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).partitionBy(m -> "haha", m -> m).sendTo(om1);
- m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(om2);
+ MessageStream messageStream1 = streamGraph.getInputStream("input1").map(m -> m);
+ MessageStream messageStream2 = streamGraph.getInputStream("input2").filter(m -> true);
+ MessageStream messageStream3 =
+ streamGraph.getInputStream("input3").filter(m -> true).partitionBy(m -> "hehe", m -> m).map(m -> m);
+ OutputStream<Object> outputStream1 = streamGraph.getOutputStream("output1");
+ OutputStream<Object> outputStream2 = streamGraph.getOutputStream("output2");
+
+ messageStream1
+ .join(messageStream2, mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2))
+ .partitionBy(m -> "haha", m -> m)
+ .sendTo(outputStream1);
+ messageStream3
+ .join(messageStream2, mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1))
+ .sendTo(outputStream2);
Multimap<SystemStream, SystemStream> outputToInput = OperatorImplGraph.getIntermediateToInputStreamsMap(streamGraph);
Collection<SystemStream> inputs = outputToInput.get(int1.toSystemStream());