You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/08/04 02:57:07 UTC

[GitHub] [samza] byjiang1996 commented on a change in pull request #1403: SAMZA-2569: Add features into StreamAppender

byjiang1996 commented on a change in pull request #1403:
URL: https://github.com/apache/samza/pull/1403#discussion_r464769314



##########
File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -305,38 +318,58 @@ protected Config getConfig() {
     return config;
   }
 
+  protected Log4jSystemConfig getLog4jSystemConfig(Config config) {
+    return new Log4jSystemConfig(config);
+  }
+
+  protected StreamAppenderMetrics getMetrics(MetricsRegistryMap metricsRegistry) {
+    return new StreamAppenderMetrics(appenderName, metricsRegistry);
+  }
+
+  protected void setupStream(SystemFactory systemFactory, String systemName) {
+    if (config.getBoolean(CREATE_STREAM_ENABLED, false)) {
+      // Explicitly create stream appender stream with the partition count the same as the number of containers.
+      System.out.println(String.format("[%s] creating stream ", appenderName) + streamName + " with partition count " + getPartitionCount());
+      StreamSpec streamSpec =
+          StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, getPartitionCount());
+
+      // SystemAdmin only needed for stream creation here.
+      SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
+      systemAdmin.start();
+      systemAdmin.createStream(streamSpec);
+      systemAdmin.stop();
+    }
+  }
+
   protected void setupSystem() {
     config = getConfig();
-    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
+    Log4jSystemConfig log4jSystemConfig = getLog4jSystemConfig(config);
 
     if (streamName == null) {
       streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
     }
 
-    // TODO we need the ACTUAL metrics registry, or the metrics won't get reported by the metric reporters!
-    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
-    metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
+    // Instantiate metrics
+    MetricsRegistryMap metricsRegistry = new MetricsRegistryMap();

Review comment:
       Sorry. Now you can see the new updates.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org