You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/09/16 20:24:18 UTC

[GitHub] [samza] cameronlee314 opened a new pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

cameronlee314 opened a new pull request #1532:
URL: https://github.com/apache/samza/pull/1532


   Issues:
   `StreamAppender` is coupled with Samza being run in YARN:
   1. `StreamAppender` specifically checks for `isApplicationMaster`, which is a YARN-only term, and it has different initialization logic if it is or isn't in the application master.
   2. `StreamAppender` is coupled with `JobModelManager`, which is only used in YARN.
   3. `StreamAppender` can only get configs from a specific HTTP coordinator endpoint, so it can't be used in other setups.
   For running in Kubernetes (or other environments in general), this class can be generalized better.
   
   Changes:
   1. Add a `LoggingContextHolder` which provides general access to config for log appenders regardless of container type. This is still a static class, similar to how appenders were already using the static `JobModelManager.currentJobModelManager`. `StreamAppender` gets config from this `LoggingContextHolder`; it no longer reads the config from `JobModelManager` or an HTTP coordinator endpoint. `JobModelManager` and `ContainerLaunchUtil` were updated to set the config in the `LoggingContextHolder`.
   2. Update `StreamAppender` (log4j, log4j2) to have a consistent flow for checking when it can initialize itself, instead of checking application master vs. worker.
   3. Refactoring included a bug fix in which the first log event after system initialization was not sent to the log stream.
   4. `StreamAppender.getPartitionCount` only returns the overridden partition count value instead of falling back to "job.container.count". This was done to prevent access to the config before it was available, and this is more consistent with the log4j getter pattern for configured values.
   
   Tests:
   1. Updated unit tests
   2. Ran integration tests
   
   API/usage changes:
   1. Constructor for `org.apache.samza.logging.log4j2.StreamAppender` has an extra argument for `LoggingContextHolder`. Any classes which extend this need to be updated to pass the extra argument. Creation of `org.apache.samza.logging.log4j2.StreamAppender` in `log4j2.xml` does not change.
   2. `StreamAppender.getPartitionCount` (for both log4j and log4j2) does not fall back to reading "job.container.count". Any classes which use that method and rely on "job.container.count" as a fallback need to be updated to read that config on their own.
   3. In worker containers, logs that happen before the call to `LoggingContextHolder.INSTANCE.setConfig` in `ContainerLaunchUtil` will no longer be handled by `StreamAppender`. There are few logs that are impacted by this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r715181067



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -152,18 +144,19 @@ public String getStreamName() {
   }
 
   /**
-   * Getter for the Config parameter.
+   * This should only be called after verifying that the {@link LoggingContextHolder} has the config.

Review comment:
       This usage pattern constraint actually already exists in the current implementation. In the AM, it is already not guaranteed that the config is available. I was just adding some documentation to try to help clarify that now.
   Unfortunately, to improve this, we would need to change the semantics of some other parts of this class. I actually did make some other changes in the initial iteration, but @lakshmi-manasa-g made a good point about maintaining compatibility within the scope of this PR. There is more discussion of this at https://github.com/apache/samza/pull/1532#discussion_r714217467.
   Ideally, there would have been one fetch to the config and then that would have been passed around as a local argument instead of through an instance variable. That would have allowed better enforcement of the availability of the config. For now, this continues to need to be manually checked for ordering of method calls.
   
   Edit: I didn't realize that @lakshmi-manasa-g had responded while I was writing up this comment, so there might be some overlap in our responses.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] alnzng commented on a change in pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
alnzng commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r715189993



##########
File path: samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
##########
@@ -50,18 +51,22 @@ public static void main(String[] args) throws Exception {
         }));
 
     String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID);
-    log.info(String.format("Got container ID: %s", containerId));
     System.out.println(String.format("Container ID: %s", containerId));
 
     String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
-    log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
     System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
 
     Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
 
     int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
     JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
     Config config = jobModel.getConfig();
+
+    // this call is also in ContainerLaunchUtil, but adding this here allows more logs to get handled by Samza loggers
+    LoggingContextHolder.INSTANCE.setConfig(config);
+    log.info(String.format("Got container ID: %s", containerId));
+    log.info(String.format("Got coordinator URL: %s", coordinatorUrl));

Review comment:
       I think leverage log4j format is a better way than using String format:
   ```java
   log.info("Got container ID: {}", containerId);
   log.info("Got coordinator URL: {}, coordinatorUrl));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r712524670



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -490,6 +455,17 @@ protected void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName)
     }
   }
 
+  /**
+   * If the partition count was explicitly specified, then use that. Otherwise, use the container count as the partition
+   * count.
+   */
+  private int calculateStreamPartitionCount(Config config) {

Review comment:
       calculateStreamPartitionCount does what the getPartitionCount was doing earlier.
   and the stream created is now using calculateStreamPartition count (falling back to container count). 
   why not use the getPartitionCount itself?

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -89,38 +78,40 @@
   private byte[] keyBytes; // Serialize the key once, since we will use it for every event.
   private String containerName = null;
   private int partitionCount = 0;
-  private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
 
-  private Thread transferThread;
+  private volatile Thread transferThread;
   private Config config = null;
   private String streamName = null;
   private final boolean usingAsyncLogger;
+  private final LoggingContextHolder loggingContextHolder;
 
   /**
    * used to detect if this thread is called recursively
    */
   private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
 
   protected static final int DEFAULT_QUEUE_SIZE = 100;
-  protected static volatile boolean systemInitialized = false;
+  protected volatile boolean systemInitialized = false;
   protected StreamAppenderMetrics metrics;
   protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
 
+  /**
+   * Constructor is protected so that this class can be extended.
+   */
   protected StreamAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions,
-      boolean usingAsyncLogger, String streamName) {
+      boolean usingAsyncLogger, String streamName, LoggingContextHolder loggingContextHolder) {

Review comment:
       why do we need LoggingContextHolder to be passed in as a constructor param?
   why in log4j2 but not in log4j?
   since LoggingContextHolder is a singleton why not use it the same way as in log4j

##########
File path: samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
##########
@@ -74,15 +67,14 @@
   protected static final int DEFAULT_QUEUE_SIZE = 100;
   private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice
 
-  protected static volatile boolean systemInitialized = false;

Review comment:
       are we removing static because there will always be only one StreamAppender object because of the way log4j2 wires it in?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on pull request #1532:
URL: https://github.com/apache/samza/pull/1532#issuecomment-923432123


   > > why were log4j2 tests deleted?
   > 
   > Could you please clarify which deleted tests you are referring to? I don't think there should be anything deleted (just refactored).
   
   sorry my bad. found it. missed it cause of large diff follwing deleted file 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on pull request #1532:
URL: https://github.com/apache/samza/pull/1532#issuecomment-925353151


   > also for - " logs that happen before the call to LoggingContextHolder.INSTANCE.setConfig in ContainerLaunchUtil will no longer be handled by StreamAppender. There are few logs that are impacted by this."
   > are there logs generated within samza that will be not sent to StreamAppender?
   
   Yes, `LocalContainerRunner` has a couple logs that would be missed by `StreamAppender`. Also, if there is an external caller of `ContainerLaunchUtil.run` (e.g. Beam), then any logs from that external caller which happen before `ContainerLaunchUtil.run` is called will be missed. For `LocalContainerRunner`, I can move those logs to be after  `ContainerLaunchUtil.run`. For external callers, they can call `LoggingContextHolder.INSTANCE.setConfig` explicitly if they need logging to be available sooner. I will update this in the PR description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r715125388



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -152,18 +144,19 @@ public String getStreamName() {
   }
 
   /**
-   * Getter for the Config parameter.
+   * This should only be called after verifying that the {@link LoggingContextHolder} has the config.

Review comment:
       How do we ensure/enforce that?

##########
File path: samza-core/src/main/java/org/apache/samza/logging/LoggingContextHolder.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.logging;
+
+import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Holds information to be used by loggers. For example, some custom Samza log4j/log4j2 logging appenders need system
+ * configs for initialization, so this allows the configs to be passed to those appenders.
+ */
+public class LoggingContextHolder {
+  private static final Logger LOG = LoggerFactory.getLogger(LoggingContextHolder.class);
+  public static final LoggingContextHolder INSTANCE = new LoggingContextHolder();
+
+  private final AtomicReference<Config> config = new AtomicReference<>();
+
+  @VisibleForTesting
+  LoggingContextHolder() {
+  }

Review comment:
       If the intent is to only use the singleton, this pattern would still let classes under same package break the assumption. 
   
   Thoughts on having reset/clear method if you want to reuse the instance in the test? Not sure which option w/ tradeoffs fits better here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on pull request #1532:
URL: https://github.com/apache/samza/pull/1532#issuecomment-923416292


   why were log4j2 tests deleted?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r714210883



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -89,38 +78,40 @@
   private byte[] keyBytes; // Serialize the key once, since we will use it for every event.
   private String containerName = null;
   private int partitionCount = 0;
-  private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
 
-  private Thread transferThread;
+  private volatile Thread transferThread;
   private Config config = null;
   private String streamName = null;
   private final boolean usingAsyncLogger;
+  private final LoggingContextHolder loggingContextHolder;
 
   /**
    * used to detect if this thread is called recursively
    */
   private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
 
   protected static final int DEFAULT_QUEUE_SIZE = 100;
-  protected static volatile boolean systemInitialized = false;
+  protected volatile boolean systemInitialized = false;
   protected StreamAppenderMetrics metrics;
   protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
 
+  /**
+   * Constructor is protected so that this class can be extended.
+   */
   protected StreamAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions,
-      boolean usingAsyncLogger, String streamName) {
+      boolean usingAsyncLogger, String streamName, LoggingContextHolder loggingContextHolder) {

Review comment:
       ah i see. thanks for explaining why not log4j and the need for this extra argument.
   however, we are making a breaking change (needing all extended classes to update) for making testing easier. 
   i feel we could add another constructor for testing and avoid breaking changes.

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -490,6 +455,17 @@ protected void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName)
     }
   }
 
+  /**
+   * If the partition count was explicitly specified, then use that. Otherwise, use the container count as the partition
+   * count.
+   */
+  private int calculateStreamPartitionCount(Config config) {

Review comment:
       since this is also a breaking change, i want to dig a bit more into this.
   so earlier `getPartitioncount` used getConfig() which fetched the config when config became available (different conditions for AM vs container). So even with this earlier flow, if getPartitionCount was ever called before config became available then there would be a problem. caller of getPartitionCount has already ensured that this call is made only after config becomes available. 
   
   i am seeing LoggingContextHolder as a way to avoid the separate checks for AM and container and even avoid calling them isAM and make it cleaner. it is not going to make config available sooner than before. given this, its more a refactor for cleaner and readable code than logic changes. Hence, i am uncomfortable making breaking changes and would like to avoid if possible. 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r714328789



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -89,38 +78,40 @@
   private byte[] keyBytes; // Serialize the key once, since we will use it for every event.
   private String containerName = null;
   private int partitionCount = 0;
-  private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
 
-  private Thread transferThread;
+  private volatile Thread transferThread;
   private Config config = null;
   private String streamName = null;
   private final boolean usingAsyncLogger;
+  private final LoggingContextHolder loggingContextHolder;
 
   /**
    * used to detect if this thread is called recursively
    */
   private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
 
   protected static final int DEFAULT_QUEUE_SIZE = 100;
-  protected static volatile boolean systemInitialized = false;
+  protected volatile boolean systemInitialized = false;
   protected StreamAppenderMetrics metrics;
   protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
 
+  /**
+   * Constructor is protected so that this class can be extended.
+   */
   protected StreamAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions,
-      boolean usingAsyncLogger, String streamName) {
+      boolean usingAsyncLogger, String streamName, LoggingContextHolder loggingContextHolder) {

Review comment:
       Good point. I was thinking that unit tests would need to be updated for extenders of this class, so might as well require extenders to update the constructor too, but it makes more sense to leave a backwards compat constructor. I will update this.

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -490,6 +455,17 @@ protected void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName)
     }
   }
 
+  /**
+   * If the partition count was explicitly specified, then use that. Otherwise, use the container count as the partition
+   * count.
+   */
+  private int calculateStreamPartitionCount(Config config) {

Review comment:
       Good point that callers of `getPartitionCount` already needed to make sure config was available. I will change this back.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r715079313



##########
File path: samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
##########
@@ -408,4 +393,15 @@ private void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName, St
   public Serde<LoggingEvent> getSerde() {
     return serde;
   }
+
+  /**
+   * If the partition count was explicitly specified, then use that. Otherwise, use the container count as the partition
+   * count.
+   */
+  private int calculateStreamPartitionCount(Config config) {

Review comment:
       nit: since getPartitionCount is restored, we dont need this right

##########
File path: samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
##########
@@ -50,18 +51,22 @@ public static void main(String[] args) throws Exception {
         }));
 
     String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID);
-    log.info(String.format("Got container ID: %s", containerId));
     System.out.println(String.format("Container ID: %s", containerId));
 
     String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
-    log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
     System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
 
     Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
 
     int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
     JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
     Config config = jobModel.getConfig();
+
+    // this call is also in LocalContainerRunner, but adding this here allows more logs to get handled by Samza loggers

Review comment:
       nit: was this meant to be "this call is also in ContainerLaunchUtil"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r715181067



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -152,18 +144,19 @@ public String getStreamName() {
   }
 
   /**
-   * Getter for the Config parameter.
+   * This should only be called after verifying that the {@link LoggingContextHolder} has the config.

Review comment:
       This usage pattern constraint actually already exists in the current implementation. In the AM, it is already not guaranteed that the config is available. I was just adding some documentation to try to help clarify that now.
   Unfortunately, to improve this, we would need to change the semantics of some other parts of this class. I actually did make some other changes in the initial iteration, but @lakshmi-manasa-g made a good point about maintaining compatibility within the scope of this PR. There is more discussion of this at https://github.com/apache/samza/pull/1532#discussion_r714217467.
   Ideally, there would have been one fetch to the config and then that would have been passed around as a local argument instead of through an instance variable. That would have allowed better enforcement of the availability of the config. For now, this continues to need to be manually checked for ordering of method calls.

##########
File path: samza-core/src/main/java/org/apache/samza/logging/LoggingContextHolder.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.logging;
+
+import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Holds information to be used by loggers. For example, some custom Samza log4j/log4j2 logging appenders need system
+ * configs for initialization, so this allows the configs to be passed to those appenders.
+ */
+public class LoggingContextHolder {
+  private static final Logger LOG = LoggerFactory.getLogger(LoggingContextHolder.class);
+  public static final LoggingContextHolder INSTANCE = new LoggingContextHolder();
+
+  private final AtomicReference<Config> config = new AtomicReference<>();
+
+  @VisibleForTesting
+  LoggingContextHolder() {
+  }

Review comment:
       Whenever possible, I prefer to avoid using statics in tests. If the statics are not properly reset (and it's not always obvious that a static needs to be reset unless you look at the class), then other tests could be impacted, and that can lead to hard-to-debug failures. For example, a test using a static might pass when run on its own, but then if it runs as part of a full build, it might fail since the static wasn't reset properly from some other test.
   I don't think it is too much of a problem that we can't exactly strictly enforce the expected usage. Package-private gets us pretty far, and even if someone abuses the package privacy and uses their own `LoggingContextHolder`, there isn't really anything useful to get out of that. Also, `@VisibleForTesting` documents this as "test only".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 merged pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
cameronlee314 merged pull request #1532:
URL: https://github.com/apache/samza/pull/1532


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on pull request #1532:
URL: https://github.com/apache/samza/pull/1532#issuecomment-923418651


   > why were log4j2 tests deleted?
   
   Could you please clarify which deleted tests you are referring to? I don't think there should be anything deleted (just refactored).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r715087824



##########
File path: samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
##########
@@ -50,18 +51,22 @@ public static void main(String[] args) throws Exception {
         }));
 
     String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID);
-    log.info(String.format("Got container ID: %s", containerId));
     System.out.println(String.format("Container ID: %s", containerId));
 
     String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
-    log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
     System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
 
     Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
 
     int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
     JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
     Config config = jobModel.getConfig();
+
+    // this call is also in LocalContainerRunner, but adding this here allows more logs to get handled by Samza loggers

Review comment:
       Thanks!

##########
File path: samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
##########
@@ -408,4 +393,15 @@ private void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName, St
   public Serde<LoggingEvent> getSerde() {
     return serde;
   }
+
+  /**
+   * If the partition count was explicitly specified, then use that. Otherwise, use the container count as the partition
+   * count.
+   */
+  private int calculateStreamPartitionCount(Config config) {

Review comment:
       Nice catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r712619178



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -490,6 +455,17 @@ protected void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName)
     }
   }
 
+  /**
+   * If the partition count was explicitly specified, then use that. Otherwise, use the container count as the partition
+   * count.
+   */
+  private int calculateStreamPartitionCount(Config config) {

Review comment:
       `getPartitionCount` assumes that the instance variable `config` is available, but `config` isn't actually guaranteed to be available until `LoggingContextHolder` has a config filled in, so this could result in the caller accidentally using `getPartitionCount` too early. `calculateStreamPartitionCount` accepts `config` as an argument, which forces the callers of that method to ensure `config` is available.
   I didn't want to change the `getPartitionCount` signature just in case log4j/log4j2 still needed the public getter. Also, based on the javadoc for `getPartitionCount`, it is tied to some log4j/log4j2 configuration, so it seemed like it would be reasonable to have it only consider the configured value.

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -89,38 +78,40 @@
   private byte[] keyBytes; // Serialize the key once, since we will use it for every event.
   private String containerName = null;
   private int partitionCount = 0;
-  private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
 
-  private Thread transferThread;
+  private volatile Thread transferThread;
   private Config config = null;
   private String streamName = null;
   private final boolean usingAsyncLogger;
+  private final LoggingContextHolder loggingContextHolder;
 
   /**
    * used to detect if this thread is called recursively
    */
   private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
 
   protected static final int DEFAULT_QUEUE_SIZE = 100;
-  protected static volatile boolean systemInitialized = false;
+  protected volatile boolean systemInitialized = false;
   protected StreamAppenderMetrics metrics;
   protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
 
+  /**
+   * Constructor is protected so that this class can be extended.
+   */
   protected StreamAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions,
-      boolean usingAsyncLogger, String streamName) {
+      boolean usingAsyncLogger, String streamName, LoggingContextHolder loggingContextHolder) {

Review comment:
       It is being passed in so that testing is easier (allows for `LoggingContextHolder` to be more easily mocked in testing).
   In log4j, there was no explicit constructor, and I didn't add one just in case log4j made assumptions about needing an empty constructor. The log4j `TestStreamAppender` already had some other way of stubbing the config access, so I left that one as-is. log4j is an older code flow, so I wanted to make fewer changes to that. For log4j2, adding this argument allowed for cleaner and more complete testing.

##########
File path: samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
##########
@@ -74,15 +67,14 @@
   protected static final int DEFAULT_QUEUE_SIZE = 100;
   private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice
 
-  protected static volatile boolean systemInitialized = false;

Review comment:
       I removed static to make sure that each instance of `StreamAppender` does its own initialization. Before this PR, the static flag actually caused a bug in which only the first instance of the `StreamAppender` in the JVM would get initialized. No other instances would get initialized.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r715202333



##########
File path: samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
##########
@@ -50,18 +51,22 @@ public static void main(String[] args) throws Exception {
         }));
 
     String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID);
-    log.info(String.format("Got container ID: %s", containerId));
     System.out.println(String.format("Container ID: %s", containerId));
 
     String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
-    log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
     System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
 
     Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
 
     int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
     JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
     Config config = jobModel.getConfig();
+
+    // this call is also in ContainerLaunchUtil, but adding this here allows more logs to get handled by Samza loggers
+    LoggingContextHolder.INSTANCE.setConfig(config);
+    log.info(String.format("Got container ID: %s", containerId));
+    log.info(String.format("Got coordinator URL: %s", coordinatorUrl));

Review comment:
       Agreed, but I was just moving a few lines around here, so would prefer to not change in this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 merged pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
cameronlee314 merged pull request #1532:
URL: https://github.com/apache/samza/pull/1532


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r715182694



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -152,18 +144,19 @@ public String getStreamName() {
   }
 
   /**
-   * Getter for the Config parameter.
+   * This should only be called after verifying that the {@link LoggingContextHolder} has the config.

Review comment:
       Thanks for the details.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1532: SAMZA-2693: Make Samza log4j appenders agnostic of where they are running

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1532:
URL: https://github.com/apache/samza/pull/1532#discussion_r715158025



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -152,18 +144,19 @@ public String getStreamName() {
   }
 
   /**
-   * Getter for the Config parameter.
+   * This should only be called after verifying that the {@link LoggingContextHolder} has the config.

Review comment:
       there is actually no tangible way to enforce it. Even prior to this PR, there were only checks to see if the AM's config is available via "JobModelManager.currentJobModelManager() != null" before using the config or calling this getConfig() and no real enforcement. But at that time, there was no explicit comment/doc stating this requirement/caution. now the same thing happens but via "this.loggingContextHolder.getConfig() != null".
   
   the thing about StreamAppender is that because its a log appender but depends on non-logging components' flow and availability (for config..) its flow becomes inter-twined with those components  and hence hard to follow. For example, config get fetched after some component is up and config is asked for within setupSystem which sits as a no-op until config is avail. in other words, the internal methods within this class are dependent and called by other methods BASED on external components flow. This is what lends to this confusing pattern.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org