You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2019/02/23 16:05:16 UTC
[samza] branch master updated: SAMZA-1990: Samza framework should
let using the same system stream as both input and output.
This is an automated email from the ASF dual-hosted git repository.
atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 5fcfc95 SAMZA-1990: Samza framework should let using the same system stream as both input and output.
5fcfc95 is described below
commit 5fcfc95ee168f1ff82496b11f70db918b1c42332
Author: Daniel Nishimura <dn...@linkedin.com>
AuthorDate: Sat Feb 23 08:05:11 2019 -0800
SAMZA-1990: Samza framework should let using the same system stream as both input and output.
**Symptom:** An `IllegalArgumentException` is thrown when the same `streamId` is referred from multiple input/output stream descriptors.
**Cause:** The `ApplicationDescriptorImpl` caches the serde instances for streams by a `streamId` and there's a check to ensure the expected stream serde matches when using the same stream from multiple input/output descriptors. However the check is incorrect because it compares serde instances and not serde types. This check always fails in this scenario.
**Fix:** Compare the stream serdes for a particular `streamId` by type.
Please take a look prateekm nickpan47
CC: atoomula
Author: Daniel Nishimura <dn...@linkedin.com>
Reviewers: prateekm
Closes #928 from dnishimura/samza-1990-same-stream-different-inputoutputdescriptors
---
.../samza/application/descriptors/ApplicationDescriptorImpl.java | 5 ++++-
.../java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java | 5 +----
2 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
index d3c283c..2cd685e 100644
--- a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
@@ -311,9 +311,12 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
". Values will not be (de)serialized");
}
streamSerdes.put(streamId, KV.of(keySerde, valueSerde));
- } else if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
+ } else if (!currentSerdePair.getKey().getClass().equals(keySerde.getClass())
+ || !currentSerdePair.getValue().getClass().equals(valueSerde.getClass())) {
throw new IllegalArgumentException(String.format("Serde for streamId: %s is already defined. Cannot change it to "
+ "different serdes.", streamId));
+ } else {
+ LOGGER.warn("Using previously defined serde for streamId: " + streamId + ".");
}
return streamSerdes.get(streamId);
}
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 6ddb68b..e69ae9a 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -174,10 +174,7 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
Assert.assertEquals(numMessages, outMessagesSet.size());
Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(new ArrayList<>(outMessagesSet)));
}
-
- // The below test won't work until SAMZA-1990 is fixed. Currently, Samza framework does not allow same system stream
- // to be used as both input and output stream.
- @Ignore
+
@Test
public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() {
int numMessages = 20;