You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/09/20 23:02:39 UTC

[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

lakshmi-manasa-g commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r712524670



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -490,6 +455,17 @@ protected void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName)
     }
   }
 
+  /**
+   * If the partition count was explicitly specified, then use that. Otherwise, use the container count as the partition
+   * count.
+   */
+  private int calculateStreamPartitionCount(Config config) {

Review comment:
       calculateStreamPartitionCount does what the getPartitionCount was doing earlier.
   and the stream created is now using calculateStreamPartition count (falling back to container count). 
   why not use the getPartitionCount itself?

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -89,38 +78,40 @@
   private byte[] keyBytes; // Serialize the key once, since we will use it for every event.
   private String containerName = null;
   private int partitionCount = 0;
-  private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
 
-  private Thread transferThread;
+  private volatile Thread transferThread;
   private Config config = null;
   private String streamName = null;
   private final boolean usingAsyncLogger;
+  private final LoggingContextHolder loggingContextHolder;
 
   /**
    * used to detect if this thread is called recursively
    */
   private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
 
   protected static final int DEFAULT_QUEUE_SIZE = 100;
-  protected static volatile boolean systemInitialized = false;
+  protected volatile boolean systemInitialized = false;
   protected StreamAppenderMetrics metrics;
   protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
 
+  /**
+   * Constructor is protected so that this class can be extended.
+   */
   protected StreamAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions,
-      boolean usingAsyncLogger, String streamName) {
+      boolean usingAsyncLogger, String streamName, LoggingContextHolder loggingContextHolder) {

Review comment:
       why do we need LoggingContextHolder to be passed in as a constructor param?
   why in log4j2 but not in log4j?
   since LoggingContextHolder is a singleton why not use it the same way as in log4j

##########
File path: samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
##########
@@ -74,15 +67,14 @@
   protected static final int DEFAULT_QUEUE_SIZE = 100;
   private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice
 
-  protected static volatile boolean systemInitialized = false;

Review comment:
       are we removing static because there will always be only one StreamAppender object because of the way log4j2 wires it in?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org