You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/07/25 04:09:15 UTC

[GitHub] [samza] byjiang1996 opened a new pull request #1403: Make StreamAppender extend-friendly and add logBytes, logCount metrics, and metrics reporter into StreamAppender

byjiang1996 opened a new pull request #1403:
URL: https://github.com/apache/samza/pull/1403


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 merged pull request #1403: SAMZA-2569: Add features into StreamAppender

Posted by GitBox <gi...@apache.org>.
cameronlee314 merged pull request #1403:
URL: https://github.com/apache/samza/pull/1403


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] byjiang1996 commented on a change in pull request #1403: SAMZA-2569: Add features into StreamAppender

Posted by GitBox <gi...@apache.org>.
byjiang1996 commented on a change in pull request #1403:
URL: https://github.com/apache/samza/pull/1403#discussion_r463712928



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -305,38 +318,58 @@ protected Config getConfig() {
     return config;
   }
 
+  protected Log4jSystemConfig getLog4jSystemConfig(Config config) {
+    return new Log4jSystemConfig(config);
+  }
+
+  protected StreamAppenderMetrics getMetrics(MetricsRegistryMap metricsRegistry) {
+    return new StreamAppenderMetrics(appenderName, metricsRegistry);
+  }
+
+  protected void setupStream(SystemFactory systemFactory, String systemName) {
+    if (config.getBoolean(CREATE_STREAM_ENABLED, false)) {
+      // Explicitly create stream appender stream with the partition count the same as the number of containers.
+      System.out.println(String.format("[%s] creating stream ", appenderName) + streamName + " with partition count " + getPartitionCount());
+      StreamSpec streamSpec =
+          StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, getPartitionCount());
+
+      // SystemAdmin only needed for stream creation here.
+      SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
+      systemAdmin.start();
+      systemAdmin.createStream(streamSpec);
+      systemAdmin.stop();
+    }
+  }
+
   protected void setupSystem() {
     config = getConfig();
-    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
+    Log4jSystemConfig log4jSystemConfig = getLog4jSystemConfig(config);
 
     if (streamName == null) {
       streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
     }
 
-    // TODO we need the ACTUAL metrics registry, or the metrics won't get reported by the metric reporters!
-    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
-    metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
+    // Instantiate metrics
+    MetricsRegistryMap metricsRegistry = new MetricsRegistryMap();
+    // Take this.getClass().getName() as the name to make it extend-friendly
+    metrics = getMetrics(metricsRegistry);
+    // Register metrics into metrics reporters so that they are able to be reported to other systems: e.g. inGraphs

Review comment:
       Sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] bkonold commented on a change in pull request #1403: SAMZA-2569: Add features into StreamAppender

Posted by GitBox <gi...@apache.org>.
bkonold commented on a change in pull request #1403:
URL: https://github.com/apache/samza/pull/1403#discussion_r464700908



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -84,15 +87,17 @@
 
   protected static volatile boolean systemInitialized = false;
 
-  private Config config = null;
+  protected Config config = null;
   private SystemStream systemStream = null;
   private SystemProducer systemProducer = null;
   private String key = null;
-  private String streamName = null;
+  protected String streamName = null;
+  protected String appenderName = null;
+  private String containerName = null;
   private int partitionCount = 0;
   private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
-  private Logger log = LogManager.getLogger(StreamAppender.class);
+  protected Logger log = LogManager.getLogger(StreamAppender.class);

Review comment:
       Sure we can keep the discussion there then circle back




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] bkonold commented on a change in pull request #1403: SAMZA-2569: Add features into StreamAppender

Posted by GitBox <gi...@apache.org>.
bkonold commented on a change in pull request #1403:
URL: https://github.com/apache/samza/pull/1403#discussion_r462642571



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -67,48 +69,52 @@
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.ExponentialSleepStrategy;
 import org.apache.samza.util.HttpUtil;
+import org.apache.samza.util.MetricsReporterLoader;
 import org.apache.samza.util.ReflectionUtil;
 
 @Plugin(name = "Stream", category = "Core", elementType = "appender", printObject = true)
 public class StreamAppender extends AbstractAppender {
 
-  private static final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name";
-  private static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
-  private static final String SOURCE = "log4j-log";
+  protected static final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name";
+  protected static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
+  protected static final String SOURCE = "log4j-log";

Review comment:
       Agree I've seen partial protection in several places of Samza so I think it would be fine here. Labeling only what you need to as protected is also better for the reader because it will restrict the set of functionality a subclass can override. IMO, I'd be in favor of labeling only what you need to as protected.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] byjiang1996 commented on a change in pull request #1403: SAMZA-2569: Add features into StreamAppender

Posted by GitBox <gi...@apache.org>.
byjiang1996 commented on a change in pull request #1403:
URL: https://github.com/apache/samza/pull/1403#discussion_r463713999



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -305,38 +318,58 @@ protected Config getConfig() {
     return config;
   }
 
+  protected Log4jSystemConfig getLog4jSystemConfig(Config config) {
+    return new Log4jSystemConfig(config);
+  }
+
+  protected StreamAppenderMetrics getMetrics(MetricsRegistryMap metricsRegistry) {
+    return new StreamAppenderMetrics(appenderName, metricsRegistry);
+  }
+
+  protected void setupStream(SystemFactory systemFactory, String systemName) {
+    if (config.getBoolean(CREATE_STREAM_ENABLED, false)) {
+      // Explicitly create stream appender stream with the partition count the same as the number of containers.
+      System.out.println(String.format("[%s] creating stream ", appenderName) + streamName + " with partition count " + getPartitionCount());
+      StreamSpec streamSpec =
+          StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, getPartitionCount());
+
+      // SystemAdmin only needed for stream creation here.
+      SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
+      systemAdmin.start();
+      systemAdmin.createStream(streamSpec);
+      systemAdmin.stop();
+    }
+  }
+
   protected void setupSystem() {
     config = getConfig();
-    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
+    Log4jSystemConfig log4jSystemConfig = getLog4jSystemConfig(config);
 
     if (streamName == null) {
       streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
     }
 
-    // TODO we need the ACTUAL metrics registry, or the metrics won't get reported by the metric reporters!
-    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
-    metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
+    // Instantiate metrics
+    MetricsRegistryMap metricsRegistry = new MetricsRegistryMap();

Review comment:
       No. because `MetricsReporter.register(String source, ReadableMetricsRegistry registry);` requires ReadableMetricsRegistry which is a child class of MetricsRegistry.
   
   FYI: child to parent: MetricsRegistryMap -> ReadableMetricsRegistry -> MetricsRegistry




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] byjiang1996 commented on a change in pull request #1403: SAMZA-2569: Add features into StreamAppender

Posted by GitBox <gi...@apache.org>.
byjiang1996 commented on a change in pull request #1403:
URL: https://github.com/apache/samza/pull/1403#discussion_r464769674



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -84,15 +87,17 @@
 
   protected static volatile boolean systemInitialized = false;
 
-  private Config config = null;
+  protected Config config = null;
   private SystemStream systemStream = null;
   private SystemProducer systemProducer = null;
   private String key = null;
-  private String streamName = null;
+  protected String streamName = null;
+  protected String appenderName = null;
+  private String containerName = null;
   private int partitionCount = 0;
   private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
-  private Logger log = LogManager.getLogger(StreamAppender.class);
+  protected Logger log = LogManager.getLogger(StreamAppender.class);

Review comment:
       Get back from discussion in another rb: will change to each subclass has its own logger.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] byjiang1996 commented on a change in pull request #1403: SAMZA-2569: Add features into StreamAppender

Posted by GitBox <gi...@apache.org>.
byjiang1996 commented on a change in pull request #1403:
URL: https://github.com/apache/samza/pull/1403#discussion_r464769314



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -305,38 +318,58 @@ protected Config getConfig() {
     return config;
   }
 
+  protected Log4jSystemConfig getLog4jSystemConfig(Config config) {
+    return new Log4jSystemConfig(config);
+  }
+
+  protected StreamAppenderMetrics getMetrics(MetricsRegistryMap metricsRegistry) {
+    return new StreamAppenderMetrics(appenderName, metricsRegistry);
+  }
+
+  protected void setupStream(SystemFactory systemFactory, String systemName) {
+    if (config.getBoolean(CREATE_STREAM_ENABLED, false)) {
+      // Explicitly create stream appender stream with the partition count the same as the number of containers.
+      System.out.println(String.format("[%s] creating stream ", appenderName) + streamName + " with partition count " + getPartitionCount());
+      StreamSpec streamSpec =
+          StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, getPartitionCount());
+
+      // SystemAdmin only needed for stream creation here.
+      SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
+      systemAdmin.start();
+      systemAdmin.createStream(streamSpec);
+      systemAdmin.stop();
+    }
+  }
+
   protected void setupSystem() {
     config = getConfig();
-    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
+    Log4jSystemConfig log4jSystemConfig = getLog4jSystemConfig(config);
 
     if (streamName == null) {
       streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
     }
 
-    // TODO we need the ACTUAL metrics registry, or the metrics won't get reported by the metric reporters!
-    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
-    metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
+    // Instantiate metrics
+    MetricsRegistryMap metricsRegistry = new MetricsRegistryMap();

Review comment:
       Sorry. Now you can see the new updates.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] byjiang1996 commented on a change in pull request #1403: SAMZA-2569: Add features into StreamAppender

Posted by GitBox <gi...@apache.org>.
byjiang1996 commented on a change in pull request #1403:
URL: https://github.com/apache/samza/pull/1403#discussion_r463758656



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -84,15 +87,17 @@
 
   protected static volatile boolean systemInitialized = false;
 
-  private Config config = null;
+  protected Config config = null;
   private SystemStream systemStream = null;
   private SystemProducer systemProducer = null;
   private String key = null;
-  private String streamName = null;
+  protected String streamName = null;
+  protected String appenderName = null;
+  private String containerName = null;
   private int partitionCount = 0;
   private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
-  private Logger log = LogManager.getLogger(StreamAppender.class);
+  protected Logger log = LogManager.getLogger(StreamAppender.class);

Review comment:
       See the comments thread in another rb.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1403: SAMZA-2569: Add features into StreamAppender

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1403:
URL: https://github.com/apache/samza/pull/1403#discussion_r461737763



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -67,48 +69,52 @@
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.ExponentialSleepStrategy;
 import org.apache.samza.util.HttpUtil;
+import org.apache.samza.util.MetricsReporterLoader;
 import org.apache.samza.util.ReflectionUtil;
 
 @Plugin(name = "Stream", category = "Core", elementType = "appender", printObject = true)
 public class StreamAppender extends AbstractAppender {
 
-  private static final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name";
-  private static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
-  private static final String SOURCE = "log4j-log";
+  protected static final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name";
+  protected static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
+  protected static final String SOURCE = "log4j-log";

Review comment:
       within Samza i see classes which have only some of their variables exposed as protected and keep the rest private. So i think it is an acceptable pattern to have partial protected. We can expose more as the need arises.
   
   So the current need to extend is coming from having a usecase to support another transformation right .. this PR can "make the parts extendable for different transformation/serde" and that i believe will be acceptable too.

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -214,7 +220,7 @@ public void append(LogEvent event) {
           metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / DEFAULT_QUEUE_SIZE));
         }
       } catch (Exception e) {
-        System.err.println("[StreamAppender] Error sending log message:");
+        System.err.println(String.format("[%s] Error sending log message:", appenderName));

Review comment:
       I understand that log4j2.xml should have the Java class name of the appender.
   my question was if we can drop the variable "appenderName" and get it from `getClass().getName` instead




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] byjiang1996 commented on a change in pull request #1403: SAMZA-2569: Add features into StreamAppender

Posted by GitBox <gi...@apache.org>.
byjiang1996 commented on a change in pull request #1403:
URL: https://github.com/apache/samza/pull/1403#discussion_r463714722



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
##########
@@ -34,10 +34,22 @@
   /** The number of log messages dropped e.g. because of buffer overflow. Does not include recursive calls. */
   public final Counter logMessagesDropped;
 
+  /** The size of log messages sent out to SystemProducer. */
+  public final Counter logMessagesBytes;
+
+  /** The number of log messages sent out to SystemProducer. */
+  public final Counter logMessagesCount;
+
+  /** The number of log messages cannot be sent out due to errors e.g. serialization errors, system producer send errors. */
+  public final Counter logMessagesErrors;
+
   public StreamAppenderMetrics(String prefix, MetricsRegistry registry) {
-    super(prefix, registry);
+    super(prefix + "-", registry);
     bufferFillPct = newGauge("buffer-fill-percent", 0);
     recursiveCalls = newCounter("recursive-calls");
     logMessagesDropped = newCounter("log-messages-dropped");
+    logMessagesBytes = newCounter("log-messages-bytes");

Review comment:
       Sure.

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
##########
@@ -34,10 +34,22 @@
   /** The number of log messages dropped e.g. because of buffer overflow. Does not include recursive calls. */
   public final Counter logMessagesDropped;
 
+  /** The size of log messages sent out to SystemProducer. */
+  public final Counter logMessagesBytes;
+
+  /** The number of log messages sent out to SystemProducer. */
+  public final Counter logMessagesCount;
+
+  /** The number of log messages cannot be sent out due to errors e.g. serialization errors, system producer send errors. */
+  public final Counter logMessagesErrors;
+
   public StreamAppenderMetrics(String prefix, MetricsRegistry registry) {
-    super(prefix, registry);
+    super(prefix + "-", registry);
     bufferFillPct = newGauge("buffer-fill-percent", 0);
     recursiveCalls = newCounter("recursive-calls");
     logMessagesDropped = newCounter("log-messages-dropped");
+    logMessagesBytes = newCounter("log-messages-bytes");
+    logMessagesCount = newCounter("log-messages-count");

Review comment:
       Sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] bkonold commented on a change in pull request #1403: SAMZA-2569: Add features into StreamAppender

Posted by GitBox <gi...@apache.org>.
bkonold commented on a change in pull request #1403:
URL: https://github.com/apache/samza/pull/1403#discussion_r463404039



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -305,38 +318,58 @@ protected Config getConfig() {
     return config;
   }
 
+  protected Log4jSystemConfig getLog4jSystemConfig(Config config) {
+    return new Log4jSystemConfig(config);
+  }
+
+  protected StreamAppenderMetrics getMetrics(MetricsRegistryMap metricsRegistry) {
+    return new StreamAppenderMetrics(appenderName, metricsRegistry);
+  }
+
+  protected void setupStream(SystemFactory systemFactory, String systemName) {
+    if (config.getBoolean(CREATE_STREAM_ENABLED, false)) {
+      // Explicitly create stream appender stream with the partition count the same as the number of containers.
+      System.out.println(String.format("[%s] creating stream ", appenderName) + streamName + " with partition count " + getPartitionCount());
+      StreamSpec streamSpec =
+          StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, getPartitionCount());
+
+      // SystemAdmin only needed for stream creation here.
+      SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
+      systemAdmin.start();
+      systemAdmin.createStream(streamSpec);
+      systemAdmin.stop();
+    }
+  }
+
   protected void setupSystem() {
     config = getConfig();
-    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
+    Log4jSystemConfig log4jSystemConfig = getLog4jSystemConfig(config);
 
     if (streamName == null) {
       streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
     }
 
-    // TODO we need the ACTUAL metrics registry, or the metrics won't get reported by the metric reporters!
-    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
-    metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
+    // Instantiate metrics
+    MetricsRegistryMap metricsRegistry = new MetricsRegistryMap();

Review comment:
       We're using the same registry; does the comment from before still apply?

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
##########
@@ -34,10 +34,22 @@
   /** The number of log messages dropped e.g. because of buffer overflow. Does not include recursive calls. */
   public final Counter logMessagesDropped;
 
+  /** The size of log messages sent out to SystemProducer. */
+  public final Counter logMessagesBytes;
+
+  /** The number of log messages sent out to SystemProducer. */
+  public final Counter logMessagesCount;
+
+  /** The number of log messages cannot be sent out due to errors e.g. serialization errors, system producer send errors. */
+  public final Counter logMessagesErrors;
+
   public StreamAppenderMetrics(String prefix, MetricsRegistry registry) {
-    super(prefix, registry);
+    super(prefix + "-", registry);
     bufferFillPct = newGauge("buffer-fill-percent", 0);
     recursiveCalls = newCounter("recursive-calls");
     logMessagesDropped = newCounter("log-messages-dropped");
+    logMessagesBytes = newCounter("log-messages-bytes");
+    logMessagesCount = newCounter("log-messages-count");

Review comment:
       log-messages-sent?

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -84,15 +87,17 @@
 
   protected static volatile boolean systemInitialized = false;
 
-  private Config config = null;
+  protected Config config = null;
   private SystemStream systemStream = null;
   private SystemProducer systemProducer = null;
   private String key = null;
-  private String streamName = null;
+  protected String streamName = null;
+  protected String appenderName = null;
+  private String containerName = null;
   private int partitionCount = 0;
   private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
-  private Logger log = LogManager.getLogger(StreamAppender.class);
+  protected Logger log = LogManager.getLogger(StreamAppender.class);
   protected StreamAppenderMetrics metrics;

Review comment:
       can we organize by modifier for readability? to see everything in one place that might be modified by a subclass

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -305,38 +318,58 @@ protected Config getConfig() {
     return config;
   }
 
+  protected Log4jSystemConfig getLog4jSystemConfig(Config config) {
+    return new Log4jSystemConfig(config);
+  }
+
+  protected StreamAppenderMetrics getMetrics(MetricsRegistryMap metricsRegistry) {
+    return new StreamAppenderMetrics(appenderName, metricsRegistry);
+  }
+
+  protected void setupStream(SystemFactory systemFactory, String systemName) {
+    if (config.getBoolean(CREATE_STREAM_ENABLED, false)) {
+      // Explicitly create stream appender stream with the partition count the same as the number of containers.
+      System.out.println(String.format("[%s] creating stream ", appenderName) + streamName + " with partition count " + getPartitionCount());
+      StreamSpec streamSpec =
+          StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, getPartitionCount());
+
+      // SystemAdmin only needed for stream creation here.
+      SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
+      systemAdmin.start();
+      systemAdmin.createStream(streamSpec);
+      systemAdmin.stop();
+    }
+  }
+
   protected void setupSystem() {
     config = getConfig();
-    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
+    Log4jSystemConfig log4jSystemConfig = getLog4jSystemConfig(config);
 
     if (streamName == null) {
       streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
     }
 
-    // TODO we need the ACTUAL metrics registry, or the metrics won't get reported by the metric reporters!
-    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
-    metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
+    // Instantiate metrics
+    MetricsRegistryMap metricsRegistry = new MetricsRegistryMap();
+    // Take this.getClass().getName() as the name to make it extend-friendly
+    metrics = getMetrics(metricsRegistry);
+    // Register metrics into metrics reporters so that they are able to be reported to other systems: e.g. inGraphs

Review comment:
       let's keep mention of things like "inGraphs" out of OSS

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
##########
@@ -34,10 +34,22 @@
   /** The number of log messages dropped e.g. because of buffer overflow. Does not include recursive calls. */
   public final Counter logMessagesDropped;
 
+  /** The size of log messages sent out to SystemProducer. */
+  public final Counter logMessagesBytes;
+
+  /** The number of log messages sent out to SystemProducer. */
+  public final Counter logMessagesCount;
+
+  /** The number of log messages cannot be sent out due to errors e.g. serialization errors, system producer send errors. */
+  public final Counter logMessagesErrors;
+
   public StreamAppenderMetrics(String prefix, MetricsRegistry registry) {
-    super(prefix, registry);
+    super(prefix + "-", registry);
     bufferFillPct = newGauge("buffer-fill-percent", 0);
     recursiveCalls = newCounter("recursive-calls");
     logMessagesDropped = newCounter("log-messages-dropped");
+    logMessagesBytes = newCounter("log-messages-bytes");

Review comment:
       log-messages-bytes-sent?

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -84,15 +87,17 @@
 
   protected static volatile boolean systemInitialized = false;
 
-  private Config config = null;
+  protected Config config = null;
   private SystemStream systemStream = null;
   private SystemProducer systemProducer = null;
   private String key = null;
-  private String streamName = null;
+  protected String streamName = null;
+  protected String appenderName = null;
+  private String containerName = null;
   private int partitionCount = 0;
   private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
-  private Logger log = LogManager.getLogger(StreamAppender.class);
+  protected Logger log = LogManager.getLogger(StreamAppender.class);

Review comment:
       why does this need to be protected? can't subclasses have their own logger?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] bkonold commented on a change in pull request #1403: SAMZA-2569: Add features into StreamAppender

Posted by GitBox <gi...@apache.org>.
bkonold commented on a change in pull request #1403:
URL: https://github.com/apache/samza/pull/1403#discussion_r464703599



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -305,38 +318,58 @@ protected Config getConfig() {
     return config;
   }
 
+  protected Log4jSystemConfig getLog4jSystemConfig(Config config) {
+    return new Log4jSystemConfig(config);
+  }
+
+  protected StreamAppenderMetrics getMetrics(MetricsRegistryMap metricsRegistry) {
+    return new StreamAppenderMetrics(appenderName, metricsRegistry);
+  }
+
+  protected void setupStream(SystemFactory systemFactory, String systemName) {
+    if (config.getBoolean(CREATE_STREAM_ENABLED, false)) {
+      // Explicitly create stream appender stream with the partition count the same as the number of containers.
+      System.out.println(String.format("[%s] creating stream ", appenderName) + streamName + " with partition count " + getPartitionCount());
+      StreamSpec streamSpec =
+          StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, getPartitionCount());
+
+      // SystemAdmin only needed for stream creation here.
+      SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
+      systemAdmin.start();
+      systemAdmin.createStream(streamSpec);
+      systemAdmin.stop();
+    }
+  }
+
   protected void setupSystem() {
     config = getConfig();
-    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
+    Log4jSystemConfig log4jSystemConfig = getLog4jSystemConfig(config);
 
     if (streamName == null) {
       streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
     }
 
-    // TODO we need the ACTUAL metrics registry, or the metrics won't get reported by the metric reporters!
-    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
-    metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
+    // Instantiate metrics
+    MetricsRegistryMap metricsRegistry = new MetricsRegistryMap();

Review comment:
       So the comment was already out of date then? I don't see any difference in the register or metrics creation




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] byjiang1996 commented on a change in pull request #1403: SAMZA-2569: Add features into StreamAppender

Posted by GitBox <gi...@apache.org>.
byjiang1996 commented on a change in pull request #1403:
URL: https://github.com/apache/samza/pull/1403#discussion_r462504237



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -368,24 +389,24 @@ private void startTransferThread() {
             // Preserve the interrupted status for the loop condition.
             Thread.currentThread().interrupt();
           } catch (Throwable t) {
-            log.error("Error sending StreamAppender event to SystemProducer", t);
+            log.error("Error sending " + appenderName +" event to SystemProducer", t);

Review comment:
       added

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -368,24 +389,24 @@ private void startTransferThread() {
             // Preserve the interrupted status for the loop condition.
             Thread.currentThread().interrupt();
           } catch (Throwable t) {
-            log.error("Error sending StreamAppender event to SystemProducer", t);
+            log.error("Error sending " + appenderName +" event to SystemProducer", t);

Review comment:
       make sense to me. Added now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] byjiang1996 commented on a change in pull request #1403: SAMZA-2569: Add features into StreamAppender

Posted by GitBox <gi...@apache.org>.
byjiang1996 commented on a change in pull request #1403:
URL: https://github.com/apache/samza/pull/1403#discussion_r463727129



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -84,15 +87,17 @@
 
   protected static volatile boolean systemInitialized = false;
 
-  private Config config = null;
+  protected Config config = null;
   private SystemStream systemStream = null;
   private SystemProducer systemProducer = null;
   private String key = null;
-  private String streamName = null;
+  protected String streamName = null;
+  protected String appenderName = null;
+  private String containerName = null;
   private int partitionCount = 0;
   private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
-  private Logger log = LogManager.getLogger(StreamAppender.class);
+  protected Logger log = LogManager.getLogger(StreamAppender.class);
   protected StreamAppenderMetrics metrics;

Review comment:
       Sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] byjiang1996 commented on a change in pull request #1403: Make StreamAppender extend-friendly and add logBytes, logCount metrics, and metrics reporter into StreamAppender

Posted by GitBox <gi...@apache.org>.
byjiang1996 commented on a change in pull request #1403:
URL: https://github.com/apache/samza/pull/1403#discussion_r461238486



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -214,7 +220,7 @@ public void append(LogEvent event) {
           metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / DEFAULT_QUEUE_SIZE));
         }
       } catch (Exception e) {
-        System.err.println("[StreamAppender] Error sending log message:");
+        System.err.println(String.format("[%s] Error sending log message:", appenderName));

Review comment:
       Yes. it will be the same as what we write in log4j2.xml
   e.g.
   ```
   <Appender type="Stream" name="<whatismyname>">
         <Layout type="PatternLayout" pattern="xxx"/>
   </Appender>
   ```
   Then, the appenderName is <whatismyname>. This name should be unique in the log4j2.xml.

##########
File path: samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
##########
@@ -68,13 +68,12 @@ public void testDefaultSerde() {
   @Test
   public void testNonDefaultSerde() {
     System.setProperty("samza.container.name", "samza-container-1");
-    String streamName = StreamAppender.getStreamName("log4jTest", "1");
     Map<String, String> map = new HashMap<String, String>();
     map.put("job.name", "log4jTest");
     map.put("job.id", "1");
     map.put("serializers.registry.log4j-string.class", LoggingEventStringSerdeFactory.class.getCanonicalName());
     map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
-    map.put("systems.mock.streams." + streamName + ".samza.msg.serde", "log4j-string");
+    map.put("systems.mock.streams.__samza_log4jTest_1_logs.samza.msg.serde", "log4j-string");

Review comment:
       Because `getStreamName` function is changed from `protected static ` to `protected` as static method cannot be overridden.
   BTW, this static use case only exists here in samza's code. Seems good to remove static keyword.

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -67,48 +69,52 @@
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.ExponentialSleepStrategy;
 import org.apache.samza.util.HttpUtil;
+import org.apache.samza.util.MetricsReporterLoader;
 import org.apache.samza.util.ReflectionUtil;
 
 @Plugin(name = "Stream", category = "Core", elementType = "appender", printObject = true)
 public class StreamAppender extends AbstractAppender {
 
-  private static final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name";
-  private static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
-  private static final String SOURCE = "log4j-log";
+  protected static final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name";
+  protected static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
+  protected static final String SOURCE = "log4j-log";

Review comment:
       In my use case (extend existing StreamAppender), I do use small pieces of the changed variables. Just in case in the future, we want to expose more objects or just make extend-friendly feature thoroughly, I prefer to modify all necessary variables (seem useful for child classes) to protected instead of only changing the variables that I intend to use. 

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -214,7 +220,7 @@ public void append(LogEvent event) {
           metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / DEFAULT_QUEUE_SIZE));
         }
       } catch (Exception e) {
-        System.err.println("[StreamAppender] Error sending log message:");
+        System.err.println(String.format("[%s] Error sending log message:", appenderName));

Review comment:
       Seems appenderName is better to be used here because theoretically it is possible that multiple instances of StreamAppender (or its child appenders) are used at the same time. But appenderName is really a unique name for different appender instances which is set in log4j2.xml

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -214,7 +220,7 @@ public void append(LogEvent event) {
           metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / DEFAULT_QUEUE_SIZE));
         }
       } catch (Exception e) {
-        System.err.println("[StreamAppender] Error sending log message:");
+        System.err.println(String.format("[%s] Error sending log message:", appenderName));

Review comment:
       Seems appenderName is better to be used here because theoretically it is possible that multiple instances of StreamAppender (or its child appenders) are used at the same time. But appenderName is really a unique name for different appender instances (it does not need to be the same as "StreamAppender"; it can be any name if users want) which is set in log4j2.xml

##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -67,48 +69,52 @@
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.ExponentialSleepStrategy;
 import org.apache.samza.util.HttpUtil;
+import org.apache.samza.util.MetricsReporterLoader;
 import org.apache.samza.util.ReflectionUtil;
 
 @Plugin(name = "Stream", category = "Core", elementType = "appender", printObject = true)
 public class StreamAppender extends AbstractAppender {
 
-  private static final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name";
-  private static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
-  private static final String SOURCE = "log4j-log";
+  protected static final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name";
+  protected static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
+  protected static final String SOURCE = "log4j-log";

Review comment:
       @bkonold Do you have ideas about this issue?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org