You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/09/21 01:05:24 UTC

[GitHub] [samza] cameronlee314 commented on a change in pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

cameronlee314 commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r712619178



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -490,6 +455,17 @@ protected void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName)
     }
   }
 
+  /**
+   * If the partition count was explicitly specified, then use that. Otherwise, use the container count as the partition
+   * count.
+   */
+  private int calculateStreamPartitionCount(Config config) {

Review comment:
       `getPartitionCount` assumes that the instance variable `config` is available, but `config` isn't actually guaranteed to be available until `LoggingContextHolder` has a config filled in, so this could result in the caller accidentally using `getPartitionCount` too early. `calculateStreamPartitionCount` accepts `config` as an argument, which forces the callers of that method to ensure `config` is available.
   I didn't want to change the `getPartitionCount` signature just in case log4j/log4j2 still needed the public getter. Also, based on the javadoc for `getPartitionCount`, it is tied to some log4j/log4j2 configuration, so it seemed like it would be reasonable to have it only consider the configured value.

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -89,38 +78,40 @@
   private byte[] keyBytes; // Serialize the key once, since we will use it for every event.
   private String containerName = null;
   private int partitionCount = 0;
-  private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
 
-  private Thread transferThread;
+  private volatile Thread transferThread;
   private Config config = null;
   private String streamName = null;
   private final boolean usingAsyncLogger;
+  private final LoggingContextHolder loggingContextHolder;
 
   /**
    * used to detect if this thread is called recursively
    */
   private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
 
   protected static final int DEFAULT_QUEUE_SIZE = 100;
-  protected static volatile boolean systemInitialized = false;
+  protected volatile boolean systemInitialized = false;
   protected StreamAppenderMetrics metrics;
   protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
 
+  /**
+   * Constructor is protected so that this class can be extended.
+   */
   protected StreamAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions,
-      boolean usingAsyncLogger, String streamName) {
+      boolean usingAsyncLogger, String streamName, LoggingContextHolder loggingContextHolder) {

Review comment:
       It is being passed in so that testing is easier (allows for `LoggingContextHolder` to be more easily mocked in testing).
   In log4j, there was no explicit constructor, and I didn't add one just in case log4j made assumptions about needing an empty constructor. The log4j `TestStreamAppender` already had some other way of stubbing the config access, so I left that one as-is. log4j is an older code flow, so I wanted to make fewer changes to that. For log4j2, adding this argument allowed for cleaner and more complete testing.

##########
File path: samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
##########
@@ -74,15 +67,14 @@
   protected static final int DEFAULT_QUEUE_SIZE = 100;
   private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice
 
-  protected static volatile boolean systemInitialized = false;

Review comment:
       I removed static to make sure that each instance of `StreamAppender` does its own initialization. Before this PR, the static flag actually caused a bug in which only the first instance of the `StreamAppender` in the JVM would get initialized. No other instances would get initialized.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org