You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2019/02/23 16:05:16 UTC

[samza] branch master updated: SAMZA-1990: Samza framework should let using the same system stream as both input and output.

This is an automated email from the ASF dual-hosted git repository.

atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fcfc95  SAMZA-1990: Samza framework should let using the same system stream as both input and output.
5fcfc95 is described below

commit 5fcfc95ee168f1ff82496b11f70db918b1c42332
Author: Daniel Nishimura <dn...@linkedin.com>
AuthorDate: Sat Feb 23 08:05:11 2019 -0800

    SAMZA-1990: Samza framework should let using the same system stream as both input and output.
    
    **Symptom:** An `IllegalArgumentException` is thrown when the same `streamId` is referred from multiple input/output stream descriptors.
    **Cause:** The `ApplicationDescriptorImpl` caches the serde instances for streams by a `streamId` and there's a check to ensure the expected stream serde matches when using the same stream from multiple input/output descriptors. However the check is incorrect because it compares serde instances and not serde types. This check always fails in this scenario.
    **Fix:** Compare the stream serdes for a particular `streamId` by type.
    
    Please take a look prateekm nickpan47
    CC: atoomula
    
    Author: Daniel Nishimura <dn...@linkedin.com>
    
    Reviewers: prateekm
    
    Closes #928 from dnishimura/samza-1990-same-stream-different-inputoutputdescriptors
---
 .../samza/application/descriptors/ApplicationDescriptorImpl.java     | 5 ++++-
 .../java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java    | 5 +----
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
index d3c283c..2cd685e 100644
--- a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
@@ -311,9 +311,12 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
             ". Values will not be (de)serialized");
       }
       streamSerdes.put(streamId, KV.of(keySerde, valueSerde));
-    } else if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
+    } else if (!currentSerdePair.getKey().getClass().equals(keySerde.getClass())
+        || !currentSerdePair.getValue().getClass().equals(valueSerde.getClass())) {
       throw new IllegalArgumentException(String.format("Serde for streamId: %s is already defined. Cannot change it to "
           + "different serdes.", streamId));
+    } else {
+      LOGGER.warn("Using previously defined serde for streamId: " + streamId + ".");
     }
     return streamSerdes.get(streamId);
   }
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 6ddb68b..e69ae9a 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -174,10 +174,7 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
     Assert.assertEquals(numMessages, outMessagesSet.size());
     Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(new ArrayList<>(outMessagesSet)));
   }
-
-  // The below test won't work until SAMZA-1990 is fixed. Currently, Samza framework does not allow same system stream
-  // to be used as both input and output stream.
-  @Ignore
+  
   @Test
   public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() {
     int numMessages = 20;