You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/07/12 19:47:50 UTC

[GitHub] [samza] cameronlee314 opened a new pull request #1510: [testing] updating tests to use TestRunner framework

cameronlee314 opened a new pull request #1510:
URL: https://github.com/apache/samza/pull/1510


   Issues: Tests are slow to execute because they directly rely on Kafka/Zookeeper.
   Changes: Migrate some tests to use the TestRunner framework, so they only depend on in-memory systems. There are still some tests that rely on Kafka/Zookeeper that were harder to migrate to the TestRunner, so I didn't update those yet.
   Tests: `./gradlew build`
   API changes: N/A


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Sanil15 commented on a change in pull request #1510: [testing] updating tests to use TestRunner framework

Posted by GitBox <gi...@apache.org>.
Sanil15 commented on a change in pull request #1510:
URL: https://github.com/apache/samza/pull/1510#discussion_r681144723



##########
File path: samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
##########
@@ -68,66 +68,97 @@
       new PageView("3", "profile-page", "0"),
       new PageView("4", LOGIN_PAGE, "0"));
 
-
   @Test
   public void testProcessingFutureCompletesSuccessfully() {
     List<PageView> expectedPageViews = PAGE_VIEWS.stream()
-        .filter(pageView -> !pageView.getPageId().equals(LOGIN_PAGE) && Long.valueOf(pageView.getUserId()) > 0)
+        .filter(pageView -> !pageView.getPageId().equals(LOGIN_PAGE) && Long.parseLong(pageView.getUserId()) > 0)
         .collect(Collectors.toList());
 
-    List<PageView> actualPageViews = runTest(PAGE_VIEWS, new HashMap<>());
+    List<PageView> actualPageViews = runTest(new HashMap<>());
     assertEquals("Mismatch between expected vs actual page views", expectedPageViews, actualPageViews);
   }
 
-  @Test(expected = SamzaException.class)
+  @Test
   public void testProcessingFutureCompletesAfterTaskTimeout() {
     Map<String, String> configs = new HashMap<>();
     configs.put(TaskConfig.CALLBACK_TIMEOUT_MS, "100");
     configs.put(PROCESS_JITTER, "200");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a task callback timeout");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the actual cause. The timeout message is
+       * nested within a bunch of other exceptions.
+       */
+      Throwable rootCause = findRootCause(e);
+      assertTrue(rootCause instanceof SamzaException);
+      // the "{}" is intentional, since the exception message actually includes it (probably a logging bug)
+      assertEquals("Callback for task {} Partition 0 timed out after 100 ms.", rootCause.getMessage());
+    }
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test
   public void testProcessingExceptionIsBubbledUp() {
     Map<String, String> configs = new HashMap<>();
     configs.put(FAIL_PROCESS, "true");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a ProcessFailureException");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the actual cause. The actual exception is
+       * nested within a bunch of other exceptions.
+       */
+      assertTrue(findRootCause(e) instanceof ProcessFailureException);
+    }
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test
   public void testDownstreamOperatorExceptionIsBubbledUp() {
     Map<String, String> configs = new HashMap<>();
     configs.put(FAIL_DOWNSTREAM_OPERATOR, "true");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a FilterFailureException");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the actual cause. The actual exception is
+       * nested within a bunch of other exceptions.
+       */
+      assertTrue(findRootCause(e) instanceof FilterFailureException);
+    }
   }
 
-  private List<PageView> runTest(List<PageView> pageViews, Map<String, String> configs) {
-    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, PAGE_VIEW_STREAM), TEST_SYSTEM);
-
+  private List<PageView> runTest(Map<String, String> configs) {
     InMemorySystemDescriptor isd = new InMemorySystemDescriptor(TEST_SYSTEM);
     InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
         .getInputDescriptor(PAGE_VIEW_STREAM, new NoOpSerde<>());
-
-
     InMemoryOutputDescriptor<PageView> outputStreamDesc = isd
         .getOutputDescriptor(NON_GUEST_PAGE_VIEW_STREAM, new NoOpSerde<>());
 
     TestRunner
         .of(new AsyncFlatMapExample())
-        .addInputStream(pageViewStreamDesc, pageViews)
+        .addInputStream(pageViewStreamDesc, PAGE_VIEWS)
         .addOutputStream(outputStreamDesc, 1)
         .addConfig(new MapConfig(configs))
-        .run(Duration.ofMillis(50000));
+        .run(Duration.ofSeconds(10));
 
     Map<Integer, List<PageView>> result = TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(1000));
-    List<PageView> results = result.values().stream()
+    return result.values().stream()
         .flatMap(List::stream)
         .collect(Collectors.toList());
+  }
 
-    return results;
+  private static Throwable findRootCause(Throwable e) {

Review comment:
       is using Junit annotation for expected exception not better than this way of iterating the throwable and matching the root cause ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 merged pull request #1510: [SAMZA-2666] Updating tests to use TestRunner framework

Posted by GitBox <gi...@apache.org>.
cameronlee314 merged pull request #1510:
URL: https://github.com/apache/samza/pull/1510


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1510: [testing] updating tests to use TestRunner framework

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1510:
URL: https://github.com/apache/samza/pull/1510#discussion_r683680001



##########
File path: samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
##########
@@ -68,66 +68,97 @@
       new PageView("3", "profile-page", "0"),
       new PageView("4", LOGIN_PAGE, "0"));
 
-
   @Test
   public void testProcessingFutureCompletesSuccessfully() {
     List<PageView> expectedPageViews = PAGE_VIEWS.stream()
-        .filter(pageView -> !pageView.getPageId().equals(LOGIN_PAGE) && Long.valueOf(pageView.getUserId()) > 0)
+        .filter(pageView -> !pageView.getPageId().equals(LOGIN_PAGE) && Long.parseLong(pageView.getUserId()) > 0)
         .collect(Collectors.toList());
 
-    List<PageView> actualPageViews = runTest(PAGE_VIEWS, new HashMap<>());
+    List<PageView> actualPageViews = runTest(new HashMap<>());
     assertEquals("Mismatch between expected vs actual page views", expectedPageViews, actualPageViews);
   }
 
-  @Test(expected = SamzaException.class)
+  @Test
   public void testProcessingFutureCompletesAfterTaskTimeout() {
     Map<String, String> configs = new HashMap<>();
     configs.put(TaskConfig.CALLBACK_TIMEOUT_MS, "100");
     configs.put(PROCESS_JITTER, "200");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a task callback timeout");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the actual cause. The timeout message is
+       * nested within a bunch of other exceptions.
+       */
+      Throwable rootCause = findRootCause(e);
+      assertTrue(rootCause instanceof SamzaException);
+      // the "{}" is intentional, since the exception message actually includes it (probably a logging bug)
+      assertEquals("Callback for task {} Partition 0 timed out after 100 ms.", rootCause.getMessage());
+    }
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test
   public void testProcessingExceptionIsBubbledUp() {
     Map<String, String> configs = new HashMap<>();
     configs.put(FAIL_PROCESS, "true");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a ProcessFailureException");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the actual cause. The actual exception is
+       * nested within a bunch of other exceptions.
+       */
+      assertTrue(findRootCause(e) instanceof ProcessFailureException);
+    }
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test
   public void testDownstreamOperatorExceptionIsBubbledUp() {
     Map<String, String> configs = new HashMap<>();
     configs.put(FAIL_DOWNSTREAM_OPERATOR, "true");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a FilterFailureException");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the actual cause. The actual exception is
+       * nested within a bunch of other exceptions.
+       */
+      assertTrue(findRootCause(e) instanceof FilterFailureException);
+    }
   }
 
-  private List<PageView> runTest(List<PageView> pageViews, Map<String, String> configs) {
-    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, PAGE_VIEW_STREAM), TEST_SYSTEM);
-
+  private List<PageView> runTest(Map<String, String> configs) {
     InMemorySystemDescriptor isd = new InMemorySystemDescriptor(TEST_SYSTEM);
     InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
         .getInputDescriptor(PAGE_VIEW_STREAM, new NoOpSerde<>());
-
-
     InMemoryOutputDescriptor<PageView> outputStreamDesc = isd
         .getOutputDescriptor(NON_GUEST_PAGE_VIEW_STREAM, new NoOpSerde<>());
 
     TestRunner
         .of(new AsyncFlatMapExample())
-        .addInputStream(pageViewStreamDesc, pageViews)
+        .addInputStream(pageViewStreamDesc, PAGE_VIEWS)
         .addOutputStream(outputStreamDesc, 1)
         .addConfig(new MapConfig(configs))
-        .run(Duration.ofMillis(50000));
+        .run(Duration.ofSeconds(10));
 
     Map<Integer, List<PageView>> result = TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(1000));
-    List<PageView> results = result.values().stream()
+    return result.values().stream()
         .flatMap(List::stream)
         .collect(Collectors.toList());
+  }
 
-    return results;
+  private static Throwable findRootCause(Throwable e) {

Review comment:
       Generally, I would prefer using the junit annotation, but in this case, the junit annotation doesn't provide enough granularity, since it only checks the outer exception type. Both the test framework and Samza wrap the actual exception with `SamzaException`, so just checking `SamzaException` with a junit annotation doesn't really check that the expected failure case is being triggered.
   For example, the app execution for this test could fail due to some init step instead of during processing, but the outer exception type would still be `SamzaException` for both cases. The junit annotation would allow the result to succeed in both cases, but this current way allows us to make sure the error actually happened during processing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Sanil15 commented on pull request #1510: [testing] updating tests to use TestRunner framework

Posted by GitBox <gi...@apache.org>.
Sanil15 commented on pull request #1510:
URL: https://github.com/apache/samza/pull/1510#issuecomment-893784401


   Please Update the description of PR to match Samza SEP https://cwiki.apache.org/confluence/display/SAMZA/SEP-25%3A+PR+Title+And+Description+Guidelines and merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Sanil15 commented on a change in pull request #1510: [testing] updating tests to use TestRunner framework

Posted by GitBox <gi...@apache.org>.
Sanil15 commented on a change in pull request #1510:
URL: https://github.com/apache/samza/pull/1510#discussion_r683775059



##########
File path: samza-test/src/test/java/org/apache/samza/test/operator/TestAsyncFlatMap.java
##########
@@ -68,66 +68,97 @@
       new PageView("3", "profile-page", "0"),
       new PageView("4", LOGIN_PAGE, "0"));
 
-
   @Test
   public void testProcessingFutureCompletesSuccessfully() {
     List<PageView> expectedPageViews = PAGE_VIEWS.stream()
-        .filter(pageView -> !pageView.getPageId().equals(LOGIN_PAGE) && Long.valueOf(pageView.getUserId()) > 0)
+        .filter(pageView -> !pageView.getPageId().equals(LOGIN_PAGE) && Long.parseLong(pageView.getUserId()) > 0)
         .collect(Collectors.toList());
 
-    List<PageView> actualPageViews = runTest(PAGE_VIEWS, new HashMap<>());
+    List<PageView> actualPageViews = runTest(new HashMap<>());
     assertEquals("Mismatch between expected vs actual page views", expectedPageViews, actualPageViews);
   }
 
-  @Test(expected = SamzaException.class)
+  @Test
   public void testProcessingFutureCompletesAfterTaskTimeout() {
     Map<String, String> configs = new HashMap<>();
     configs.put(TaskConfig.CALLBACK_TIMEOUT_MS, "100");
     configs.put(PROCESS_JITTER, "200");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a task callback timeout");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the actual cause. The timeout message is
+       * nested within a bunch of other exceptions.
+       */
+      Throwable rootCause = findRootCause(e);
+      assertTrue(rootCause instanceof SamzaException);
+      // the "{}" is intentional, since the exception message actually includes it (probably a logging bug)
+      assertEquals("Callback for task {} Partition 0 timed out after 100 ms.", rootCause.getMessage());
+    }
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test
   public void testProcessingExceptionIsBubbledUp() {
     Map<String, String> configs = new HashMap<>();
     configs.put(FAIL_PROCESS, "true");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a ProcessFailureException");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the actual cause. The actual exception is
+       * nested within a bunch of other exceptions.
+       */
+      assertTrue(findRootCause(e) instanceof ProcessFailureException);
+    }
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test
   public void testDownstreamOperatorExceptionIsBubbledUp() {
     Map<String, String> configs = new HashMap<>();
     configs.put(FAIL_DOWNSTREAM_OPERATOR, "true");
 
-    runTest(PAGE_VIEWS, configs);
+    try {
+      runTest(configs);
+      fail("App execution should have failed due to a FilterFailureException");
+    } catch (SamzaException e) {
+      /*
+       * TestRunner throws SamzaException on failures in general, so check the actual cause. The actual exception is
+       * nested within a bunch of other exceptions.
+       */
+      assertTrue(findRootCause(e) instanceof FilterFailureException);
+    }
   }
 
-  private List<PageView> runTest(List<PageView> pageViews, Map<String, String> configs) {
-    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, PAGE_VIEW_STREAM), TEST_SYSTEM);
-
+  private List<PageView> runTest(Map<String, String> configs) {
     InMemorySystemDescriptor isd = new InMemorySystemDescriptor(TEST_SYSTEM);
     InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
         .getInputDescriptor(PAGE_VIEW_STREAM, new NoOpSerde<>());
-
-
     InMemoryOutputDescriptor<PageView> outputStreamDesc = isd
         .getOutputDescriptor(NON_GUEST_PAGE_VIEW_STREAM, new NoOpSerde<>());
 
     TestRunner
         .of(new AsyncFlatMapExample())
-        .addInputStream(pageViewStreamDesc, pageViews)
+        .addInputStream(pageViewStreamDesc, PAGE_VIEWS)
         .addOutputStream(outputStreamDesc, 1)
         .addConfig(new MapConfig(configs))
-        .run(Duration.ofMillis(50000));
+        .run(Duration.ofSeconds(10));
 
     Map<Integer, List<PageView>> result = TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(1000));
-    List<PageView> results = result.values().stream()
+    return result.values().stream()
         .flatMap(List::stream)
         .collect(Collectors.toList());
+  }
 
-    return results;
+  private static Throwable findRootCause(Throwable e) {

Review comment:
       make sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org