You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "jia-gao (via GitHub)" <gi...@apache.org> on 2023/01/30 23:58:23 UTC

[GitHub] [samza] jia-gao opened a new pull request, #1652: Fix deadlock in StreamAppender

jia-gao opened a new pull request, #1652:
URL: https://github.com/apache/samza/pull/1652

   Issue:
   In StreamAppender, there is a synchronized block around setupSystem(), which should be called the first time (after loggingContext config is set up) any logger logs something.
   It could lead to a deadlock situation. Because if during the setupSystem(), any other threads try to do LOG.xxx(), they will be blocked, and if the system setup depends on those threads, it leads to deadlock
   
   Change:
   Replace the synchronized block with a [tryLock](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Lock.html#tryLock(long,%20java.util.concurrent.TimeUnit))(long time, [TimeUnit](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/TimeUnit.html) unit)
   so that other threads won't be blocked forever if setupSystem() stuck or timeout
   
   Test Done:
   Add new units for the change
   ./gradlew build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] jia-gao commented on a diff in pull request #1652: Fix deadlock in StreamAppender

Posted by "jia-gao (via GitHub)" <gi...@apache.org>.
jia-gao commented on code in PR #1652:
URL: https://github.com/apache/samza/pull/1652#discussion_r1092574230


##########
samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java:
##########
@@ -444,6 +486,36 @@ private void logRecursivelyAndVerifyMessages(List<String> messages) throws Inter
     assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
   }
 
+  private void SetupSystemTimeoutAndVerifyMessages(long setUpSystemTime) throws InterruptedException {

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] jia-gao commented on a diff in pull request #1652: Fix deadlock in StreamAppender

Posted by "jia-gao (via GitHub)" <gi...@apache.org>.
jia-gao commented on code in PR #1652:
URL: https://github.com/apache/samza/pull/1652#discussion_r1092593864


##########
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java:
##########
@@ -186,13 +190,20 @@ public void append(LogEvent event) {
       if (!systemInitialized) {
         // configs are needed to set up producer system, so check that before actually initializing
         if (this.loggingContextHolder.getConfig() != null) {
-          synchronized (this) {
-            if (!systemInitialized) {
-              setupSystem();
-              systemInitialized = true;
+          if (setUpSystemLock.tryLock(SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS, TimeUnit.MILLISECONDS)) {
+            try {
+              if (!systemInitialized) {
+                setupSystem();

Review Comment:
   Based on the discussion offline:
   Yes, `setupSystem` does not provide idempotence nor retry logic to setup system upon failures.
   
   The current implementation handles both by putting 
   setupSystem() and  systemInitialized = true 
   into a critical section to ensure that
   1. no concurrent calls to setupSystem()
   2. If setupSystem() completes, it won't be called again
   3. If setupSystem() fails, it will be invoked by another thread that enters the critical section
   
   However, 2 and 3 might not be the ideal way to handle idempotence and recovery from failures. We should revisit their reliabilities and consider other solutions 
   
   one preferred option is that:
   setupSystem() should handle idempotence and recovery from failures by itself and StreamAppender doesn't need to handle them explicitly in append().
   
   
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] mynameborat commented on a diff in pull request #1652: Fix deadlock in StreamAppender

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on code in PR #1652:
URL: https://github.com/apache/samza/pull/1652#discussion_r1091377933


##########
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java:
##########
@@ -186,13 +190,20 @@ public void append(LogEvent event) {
       if (!systemInitialized) {
         // configs are needed to set up producer system, so check that before actually initializing
         if (this.loggingContextHolder.getConfig() != null) {
-          synchronized (this) {
-            if (!systemInitialized) {
-              setupSystem();
-              systemInitialized = true;
+          if (setUpSystemLock.tryLock(SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS, TimeUnit.MILLISECONDS)) {
+            try {
+              if (!systemInitialized) {
+                setupSystem();

Review Comment:
   Is this supposed to be run only once? I am thinking if we should also set it to `true` within finally block? 
   Alternatively, use `AtomicBoolean` and check and set in ln:190.



##########
samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java:
##########
@@ -444,6 +486,36 @@ private void logRecursivelyAndVerifyMessages(List<String> messages) throws Inter
     assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
   }
 
+  private void SetupSystemTimeoutAndVerifyMessages(long setUpSystemTime) throws InterruptedException {

Review Comment:
   nit: camelCasing 
   s/SetupSystemTimeoutAndVerifyMessages/setupSystemTImoutAndVerifyMessages 



##########
samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java:
##########
@@ -444,6 +486,36 @@ private void logRecursivelyAndVerifyMessages(List<String> messages) throws Inter
     assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
   }
 
+  private void SetupSystemTimeoutAndVerifyMessages(long setUpSystemTime) throws InterruptedException {
+    MockSystemProducer.listeners.clear();
+    MockSystemAdmin.listeners.clear();
+    List<String> messages = Lists.newArrayList("testing1", "testing2");
+    // Set up latch
+    final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
+    MockSystemProducer.listeners.add((source, envelope) -> allMessagesSent.countDown());
+    MockSystemAdmin.listeners.add(streamSpec -> {
+      try {
+        // mock setUpSystem time during createStream() call
+        sleep(setUpSystemTime);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    });
+    ExecutorService service0 = Executors.newFixedThreadPool(1);
+    ExecutorService service1 = Executors.newFixedThreadPool(1);
+    // Log the messages with two threads
+    service0.submit(()->LOG.info(messages.get(0)));
+    service1.submit(()->LOG.info(messages.get(1)));
+    // Wait for messages
+    allMessagesSent.await(setUpSystemTime * 4, TimeUnit.MILLISECONDS);
+    // If the setUpSystem time out, verify only one message sent. otherwise, verify two messages sent
+    if (setUpSystemTime >= SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS) {
+      assertEquals(messages.size() - 1, MockSystemProducer.messagesReceived.size());
+    } else {
+      assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
+    }
+  }
+
   private static Config baseConfig() {

Review Comment:
   perhaps add another test case for `setupSystem` throw an exception. Not sure what the expected behavior, but it will be good have a test with the expected behavior so that I won't have to ask next time :)



##########
samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java:
##########
@@ -444,6 +486,36 @@ private void logRecursivelyAndVerifyMessages(List<String> messages) throws Inter
     assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
   }
 
+  private void SetupSystemTimeoutAndVerifyMessages(long setUpSystemTime) throws InterruptedException {
+    MockSystemProducer.listeners.clear();
+    MockSystemAdmin.listeners.clear();
+    List<String> messages = Lists.newArrayList("testing1", "testing2");
+    // Set up latch
+    final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
+    MockSystemProducer.listeners.add((source, envelope) -> allMessagesSent.countDown());
+    MockSystemAdmin.listeners.add(streamSpec -> {
+      try {
+        // mock setUpSystem time during createStream() call
+        sleep(setUpSystemTime);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    });
+    ExecutorService service0 = Executors.newFixedThreadPool(1);
+    ExecutorService service1 = Executors.newFixedThreadPool(1);
+    // Log the messages with two threads
+    service0.submit(()->LOG.info(messages.get(0)));
+    service1.submit(()->LOG.info(messages.get(1)));

Review Comment:
   Introduce a jitter so that there is no guarantees on the ordering of `setup`. Meaning `service1.submit(() -> LOG.info(messages.get(1)))` can also trigger `setup`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] jia-gao commented on a diff in pull request #1652: Fix deadlock in StreamAppender

Posted by "jia-gao (via GitHub)" <gi...@apache.org>.
jia-gao commented on code in PR #1652:
URL: https://github.com/apache/samza/pull/1652#discussion_r1092593864


##########
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java:
##########
@@ -186,13 +190,20 @@ public void append(LogEvent event) {
       if (!systemInitialized) {
         // configs are needed to set up producer system, so check that before actually initializing
         if (this.loggingContextHolder.getConfig() != null) {
-          synchronized (this) {
-            if (!systemInitialized) {
-              setupSystem();
-              systemInitialized = true;
+          if (setUpSystemLock.tryLock(SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS, TimeUnit.MILLISECONDS)) {
+            try {
+              if (!systemInitialized) {
+                setupSystem();

Review Comment:
   Based on the discussion offline:
   Yes, `setupSystem` does not provide idempotence nor retry logic to setup system upon failures.
   
   The current implementation handles both by putting 
   setupSystem() and  systemInitialized = true 
   into a critical section to ensure that
   1. no concurrent calls to setupSystem()
   2. If setupSystem() completes, it won't be called again
   3. If setupSystem() fails, it will be invoked by another thread (or the same thread later) that enters the critical section
   
   However, 2 and 3 might not be the ideal way to handle idempotence and recovery from failures. We should revisit their reliabilities and consider other solutions 
   
   one preferred option is that:
   setupSystem() should handle idempotence and recovery from failures by itself and StreamAppender doesn't need to handle them explicitly in append().
   
   
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] jia-gao commented on a diff in pull request #1652: Fix deadlock in StreamAppender

Posted by "jia-gao (via GitHub)" <gi...@apache.org>.
jia-gao commented on code in PR #1652:
URL: https://github.com/apache/samza/pull/1652#discussion_r1092296612


##########
samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java:
##########
@@ -444,6 +486,36 @@ private void logRecursivelyAndVerifyMessages(List<String> messages) throws Inter
     assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
   }
 
+  private void SetupSystemTimeoutAndVerifyMessages(long setUpSystemTime) throws InterruptedException {
+    MockSystemProducer.listeners.clear();
+    MockSystemAdmin.listeners.clear();
+    List<String> messages = Lists.newArrayList("testing1", "testing2");
+    // Set up latch
+    final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
+    MockSystemProducer.listeners.add((source, envelope) -> allMessagesSent.countDown());
+    MockSystemAdmin.listeners.add(streamSpec -> {
+      try {
+        // mock setUpSystem time during createStream() call
+        sleep(setUpSystemTime);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    });
+    ExecutorService service0 = Executors.newFixedThreadPool(1);
+    ExecutorService service1 = Executors.newFixedThreadPool(1);
+    // Log the messages with two threads
+    service0.submit(()->LOG.info(messages.get(0)));
+    service1.submit(()->LOG.info(messages.get(1)));

Review Comment:
   oh ok, thanks for clarifying. will add it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] jia-gao commented on a diff in pull request #1652: Fix deadlock in StreamAppender

Posted by "jia-gao (via GitHub)" <gi...@apache.org>.
jia-gao commented on code in PR #1652:
URL: https://github.com/apache/samza/pull/1652#discussion_r1092292112


##########
samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java:
##########
@@ -444,6 +486,36 @@ private void logRecursivelyAndVerifyMessages(List<String> messages) throws Inter
     assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
   }
 
+  private void SetupSystemTimeoutAndVerifyMessages(long setUpSystemTime) throws InterruptedException {
+    MockSystemProducer.listeners.clear();
+    MockSystemAdmin.listeners.clear();
+    List<String> messages = Lists.newArrayList("testing1", "testing2");
+    // Set up latch
+    final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
+    MockSystemProducer.listeners.add((source, envelope) -> allMessagesSent.countDown());
+    MockSystemAdmin.listeners.add(streamSpec -> {
+      try {
+        // mock setUpSystem time during createStream() call
+        sleep(setUpSystemTime);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    });
+    ExecutorService service0 = Executors.newFixedThreadPool(1);
+    ExecutorService service1 = Executors.newFixedThreadPool(1);
+    // Log the messages with two threads
+    service0.submit(()->LOG.info(messages.get(0)));
+    service1.submit(()->LOG.info(messages.get(1)));
+    // Wait for messages
+    allMessagesSent.await(setUpSystemTime * 4, TimeUnit.MILLISECONDS);
+    // If the setUpSystem time out, verify only one message sent. otherwise, verify two messages sent
+    if (setUpSystemTime >= SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS) {
+      assertEquals(messages.size() - 1, MockSystemProducer.messagesReceived.size());
+    } else {
+      assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
+    }
+  }
+
   private static Config baseConfig() {

Review Comment:
   sounds good, will add a test for that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] mynameborat commented on a diff in pull request #1652: Fix deadlock in StreamAppender

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on code in PR #1652:
URL: https://github.com/apache/samza/pull/1652#discussion_r1092576559


##########
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java:
##########
@@ -186,13 +190,20 @@ public void append(LogEvent event) {
       if (!systemInitialized) {
         // configs are needed to set up producer system, so check that before actually initializing
         if (this.loggingContextHolder.getConfig() != null) {
-          synchronized (this) {
-            if (!systemInitialized) {
-              setupSystem();
-              systemInitialized = true;
+          if (setUpSystemLock.tryLock(SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS, TimeUnit.MILLISECONDS)) {
+            try {
+              if (!systemInitialized) {
+                setupSystem();

Review Comment:
   I am not sure where we landed here. Can you update the comment with our discussion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] mynameborat commented on a diff in pull request #1652: Fix deadlock in StreamAppender

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on code in PR #1652:
URL: https://github.com/apache/samza/pull/1652#discussion_r1092298435


##########
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java:
##########
@@ -186,13 +190,20 @@ public void append(LogEvent event) {
       if (!systemInitialized) {
         // configs are needed to set up producer system, so check that before actually initializing
         if (this.loggingContextHolder.getConfig() != null) {
-          synchronized (this) {
-            if (!systemInitialized) {
-              setupSystem();
-              systemInitialized = true;
+          if (setUpSystemLock.tryLock(SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS, TimeUnit.MILLISECONDS)) {
+            try {
+              if (!systemInitialized) {
+                setupSystem();

Review Comment:
   There are no guarantees from `setupSystem` on providing idempotence nor able to setup system on the subsequent retries in case of failures.
   
   So if we are handling externally for the former, why not handle the latter as well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] jia-gao commented on a diff in pull request #1652: Fix deadlock in StreamAppender

Posted by "jia-gao (via GitHub)" <gi...@apache.org>.
jia-gao commented on code in PR #1652:
URL: https://github.com/apache/samza/pull/1652#discussion_r1092286850


##########
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java:
##########
@@ -186,13 +190,20 @@ public void append(LogEvent event) {
       if (!systemInitialized) {
         // configs are needed to set up producer system, so check that before actually initializing
         if (this.loggingContextHolder.getConfig() != null) {
-          synchronized (this) {
-            if (!systemInitialized) {
-              setupSystem();
-              systemInitialized = true;
+          if (setUpSystemLock.tryLock(SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS, TimeUnit.MILLISECONDS)) {
+            try {
+              if (!systemInitialized) {
+                setupSystem();

Review Comment:
   Yes, it is supposed to be run only once.
   I prefer not to set it to true within finally block because we want to make sure it is only set to true if setupSystem() is completed successfully. In case that setupSystem() fails (throwing some exceptions for example), we want to leave it as false.
   
   The same reason applies to the option of AtomicBoolean. there are two cons for that:
   1. same as above, can not make sure setupSystem() runs fine
   2. perf reason, compareAndSet AtomicBoolean in line 190 means every time LOG.xxx() StreamAppender needs to check the AtomicBoolean which is expensive than checking a boolean



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] mynameborat merged pull request #1652: Fix deadlock in StreamAppender

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat merged PR #1652:
URL: https://github.com/apache/samza/pull/1652


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] jia-gao commented on a diff in pull request #1652: Fix deadlock in StreamAppender

Posted by "jia-gao (via GitHub)" <gi...@apache.org>.
jia-gao commented on code in PR #1652:
URL: https://github.com/apache/samza/pull/1652#discussion_r1092291776


##########
samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java:
##########
@@ -444,6 +486,36 @@ private void logRecursivelyAndVerifyMessages(List<String> messages) throws Inter
     assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
   }
 
+  private void SetupSystemTimeoutAndVerifyMessages(long setUpSystemTime) throws InterruptedException {
+    MockSystemProducer.listeners.clear();
+    MockSystemAdmin.listeners.clear();
+    List<String> messages = Lists.newArrayList("testing1", "testing2");
+    // Set up latch
+    final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
+    MockSystemProducer.listeners.add((source, envelope) -> allMessagesSent.countDown());
+    MockSystemAdmin.listeners.add(streamSpec -> {
+      try {
+        // mock setUpSystem time during createStream() call
+        sleep(setUpSystemTime);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    });
+    ExecutorService service0 = Executors.newFixedThreadPool(1);
+    ExecutorService service1 = Executors.newFixedThreadPool(1);
+    // Log the messages with two threads
+    service0.submit(()->LOG.info(messages.get(0)));
+    service1.submit(()->LOG.info(messages.get(1)));

Review Comment:
   Yes, service1 can also trigger setup. But the test doesn't care about ordering. The test only cares about the number of log events sent
   If service1 holds the lock and sleeps for a while, then messages[1] will be sent and messages[0] will not. 
   Validation remains the same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] mynameborat commented on a diff in pull request #1652: Fix deadlock in StreamAppender

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on code in PR #1652:
URL: https://github.com/apache/samza/pull/1652#discussion_r1092293902


##########
samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java:
##########
@@ -444,6 +486,36 @@ private void logRecursivelyAndVerifyMessages(List<String> messages) throws Inter
     assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
   }
 
+  private void SetupSystemTimeoutAndVerifyMessages(long setUpSystemTime) throws InterruptedException {
+    MockSystemProducer.listeners.clear();
+    MockSystemAdmin.listeners.clear();
+    List<String> messages = Lists.newArrayList("testing1", "testing2");
+    // Set up latch
+    final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
+    MockSystemProducer.listeners.add((source, envelope) -> allMessagesSent.countDown());
+    MockSystemAdmin.listeners.add(streamSpec -> {
+      try {
+        // mock setUpSystem time during createStream() call
+        sleep(setUpSystemTime);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    });
+    ExecutorService service0 = Executors.newFixedThreadPool(1);
+    ExecutorService service1 = Executors.newFixedThreadPool(1);
+    // Log the messages with two threads
+    service0.submit(()->LOG.info(messages.get(0)));
+    service1.submit(()->LOG.info(messages.get(1)));

Review Comment:
   precisely why I asked to add jitter to also cover the base that ordering doesn't matter as long as the messages emitted meets the criteria.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] jia-gao commented on a diff in pull request #1652: Fix deadlock in StreamAppender

Posted by "jia-gao (via GitHub)" <gi...@apache.org>.
jia-gao commented on code in PR #1652:
URL: https://github.com/apache/samza/pull/1652#discussion_r1092574508


##########
samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java:
##########
@@ -444,6 +486,36 @@ private void logRecursivelyAndVerifyMessages(List<String> messages) throws Inter
     assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
   }
 
+  private void SetupSystemTimeoutAndVerifyMessages(long setUpSystemTime) throws InterruptedException {
+    MockSystemProducer.listeners.clear();
+    MockSystemAdmin.listeners.clear();
+    List<String> messages = Lists.newArrayList("testing1", "testing2");
+    // Set up latch
+    final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
+    MockSystemProducer.listeners.add((source, envelope) -> allMessagesSent.countDown());
+    MockSystemAdmin.listeners.add(streamSpec -> {
+      try {
+        // mock setUpSystem time during createStream() call
+        sleep(setUpSystemTime);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    });
+    ExecutorService service0 = Executors.newFixedThreadPool(1);
+    ExecutorService service1 = Executors.newFixedThreadPool(1);
+    // Log the messages with two threads
+    service0.submit(()->LOG.info(messages.get(0)));
+    service1.submit(()->LOG.info(messages.get(1)));

Review Comment:
   Added a random sleep time before LOG



##########
samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java:
##########
@@ -444,6 +486,36 @@ private void logRecursivelyAndVerifyMessages(List<String> messages) throws Inter
     assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
   }
 
+  private void SetupSystemTimeoutAndVerifyMessages(long setUpSystemTime) throws InterruptedException {
+    MockSystemProducer.listeners.clear();
+    MockSystemAdmin.listeners.clear();
+    List<String> messages = Lists.newArrayList("testing1", "testing2");
+    // Set up latch
+    final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
+    MockSystemProducer.listeners.add((source, envelope) -> allMessagesSent.countDown());
+    MockSystemAdmin.listeners.add(streamSpec -> {
+      try {
+        // mock setUpSystem time during createStream() call
+        sleep(setUpSystemTime);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    });
+    ExecutorService service0 = Executors.newFixedThreadPool(1);
+    ExecutorService service1 = Executors.newFixedThreadPool(1);
+    // Log the messages with two threads
+    service0.submit(()->LOG.info(messages.get(0)));
+    service1.submit(()->LOG.info(messages.get(1)));
+    // Wait for messages
+    allMessagesSent.await(setUpSystemTime * 4, TimeUnit.MILLISECONDS);
+    // If the setUpSystem time out, verify only one message sent. otherwise, verify two messages sent
+    if (setUpSystemTime >= SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS) {
+      assertEquals(messages.size() - 1, MockSystemProducer.messagesReceived.size());
+    } else {
+      assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
+    }
+  }
+
   private static Config baseConfig() {

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org