You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/03/06 00:02:54 UTC

samza git commit: SAMZA-1460: StreamAppender does not explicitly create logging topic

Repository: samza
Updated Branches:
  refs/heads/master 1dfc5cecd -> 810d8bd80


SAMZA-1460: StreamAppender does not explicitly create logging topic

Creates the StreamAppender stream explicitly instead of relying on auto stream creation.

Author: Daniel Nishimura <dn...@gmail.com>

Reviewers: Prateek M <pm...@linkedin.com>

Closes #423 from dnishimura/samza-1460-streamappender-create-logging-topic


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/810d8bd8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/810d8bd8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/810d8bd8

Branch: refs/heads/master
Commit: 810d8bd805f386f79ce73efdee0d2ef341a65e83
Parents: 1dfc5ce
Author: Daniel Nishimura <dn...@gmail.com>
Authored: Mon Mar 5 16:02:49 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Mar 5 16:02:49 2018 -0800

----------------------------------------------------------------------
 .../documentation/versioned/jobs/logging.md     |  4 +-
 .../org/apache/samza/system/StreamSpec.java     |  7 ++
 .../samza/logging/log4j/StreamAppender.java     | 48 +++++++++++++
 .../samza/logging/log4j/MockSystemAdmin.java    | 74 ++++++++++++++++++++
 .../samza/logging/log4j/MockSystemFactory.java  |  2 +-
 .../samza/logging/log4j/TestStreamAppender.java | 35 +++++++++
 6 files changed, 168 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/810d8bd8/docs/learn/documentation/versioned/jobs/logging.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/logging.md b/docs/learn/documentation/versioned/jobs/logging.md
index 44eeb3c..ffb66dd 100644
--- a/docs/learn/documentation/versioned/jobs/logging.md
+++ b/docs/learn/documentation/versioned/jobs/logging.md
@@ -116,12 +116,14 @@ And then updating your log4j.xml to include the appender:
 
 #### Stream Log4j Appender
 
-Samza provides a StreamAppender to publish the logs into a specific system. You can specify the system name using "task.log4j.system" and change name of log stream with param 'StreamName'. Also, we have the [MDC](http://logback.qos.ch/manual/mdc.html) keys "containerName", "jobName" and "jobId", which help identify the source of the log. In order to use this appender, simply add:
+Samza provides a StreamAppender to publish the logs into a specific system. You can specify the system name using "task.log4j.system" and change name of log stream with param 'StreamName'. You can also specify the number of partitions for the log stream with param 'PartitionCount'; otherwise, the number of partitions will equal the number of containers configured for the job. The partition count is set upon the creation of the logging stream and changing the partition count requires manual intervention with the system stream. The [MDC](http://logback.qos.ch/manual/mdc.html) contains the keys "containerName", "jobName" and "jobId", which help identify the source of the log. In order to use this appender, add:
 
 {% highlight xml %}
 <appender name="StreamAppender" class="org.apache.samza.logging.log4j.StreamAppender">
    <!-- optional -->
    <param name="StreamName" value="EpicStreamName"/>
+   <!-- optional -->
+   <param name="PartitionCount" value="8"/>
    <layout class="org.apache.log4j.PatternLayout">
      <param name="ConversionPattern" value="%X{containerName} %X{jobName} %X{jobId} %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
    </layout>

http://git-wip-us.apache.org/repos/asf/samza/blob/810d8bd8/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index 3a005c1..ce67d8d 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -46,6 +46,9 @@ public class StreamSpec {
   // Internal checkpoint stream id. It is used for creating checkpoint StreamSpec.
   private static final String CHECKPOINT_STREAM_ID = "samza-internal-checkpoint-stream-id";
 
+  // Internal stream appender stream id. It is used for creating stream appender StreamSpec.
+  private static final String STREAM_APPENDER_ID = "samza-internal-stream-appender-stream-id";
+
   /**
    * Unique identifier for the stream in a Samza application.
    * This identifier is used as a key for stream properties in the
@@ -289,4 +292,8 @@ public class StreamSpec {
   public static StreamSpec createCheckpointStreamSpec(String physicalName, String systemName) {
     return new StreamSpec(CHECKPOINT_STREAM_ID, physicalName, systemName, 1);
   }
+
+  public static StreamSpec createStreamAppenderStreamSpec(String physicalName, String systemName, int partitionCount) {
+    return new StreamSpec(STREAM_APPENDER_ID, physicalName, systemName, partitionCount);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/810d8bd8/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index 5f41959..9ea169d 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -32,6 +32,7 @@ import org.apache.log4j.Logger;
 import org.apache.log4j.spi.LoggingEvent;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.Log4jSystemConfig;
 import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.config.ShellCommandConfig;
@@ -44,6 +45,8 @@ import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeFactory;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
@@ -70,6 +73,7 @@ public class StreamAppender extends AppenderSkeleton {
   private SystemProducer systemProducer = null;
   private String key = null;
   private String streamName = null;
+  private int partitionCount = 0;
   private boolean isApplicationMaster = false;
   private Serde<LoggingEvent> serde = null;
   private Logger log = Logger.getLogger(StreamAppender.class);
@@ -85,14 +89,48 @@ public class StreamAppender extends AppenderSkeleton {
    */
   private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
 
+  /**
+   * Getter for the StreamName parameter. See also {@link #activateOptions()} for when this is called.
+   * Example: {@literal <param name="StreamName" value="ExampleStreamName"/>}
+   * @return The configured stream name.
+   */
   public String getStreamName() {
     return this.streamName;
   }
 
+  /**
+   * Setter for the StreamName parameter. See also {@link #activateOptions()} for when this is called.
+   * Example: {@literal <param name="StreamName" value="ExampleStreamName"/>}
+   * @param streamName The configured stream name.
+   */
   public void setStreamName(String streamName) {
     this.streamName = streamName;
   }
 
+  /**
+   * Getter for the number of partitions to create on a new StreamAppender stream. See also {@link #activateOptions()} for when this is called.
+   * Example: {@literal <param name="PartitionCount" value="4"/>}
+   * @return The configured partition count of the StreamAppender stream. If not set, returns {@link JobConfig#getContainerCount()}.
+   */
+  public int getPartitionCount() {
+    if (partitionCount > 0) {
+      return partitionCount;
+    }
+    return new JobConfig(getConfig()).getContainerCount();
+  }
+
+  /**
+   * Setter for the number of partitions to create on a new StreamAppender stream. See also {@link #activateOptions()} for when this is called.
+   * Example: {@literal <param name="PartitionCount" value="4"/>}
+   * @param partitionCount Configurable partition count.
+   */
+  public void setPartitionCount(int partitionCount) {
+    this.partitionCount = partitionCount;
+  }
+
+  /**
+   * Additional configurations needed before logging to stream. Called once in the container before the first log event is sent.
+   */
   @Override
   public void activateOptions() {
     String containerName = System.getProperty(JAVA_OPTS_CONTAINER_NAME);
@@ -260,6 +298,16 @@ public class StreamAppender extends AppenderSkeleton {
 
     setSerde(log4jSystemConfig, systemName, streamName);
 
+    // Explicitly create stream appender stream with the partition count the same as the number of containers.
+    System.out.println("[StreamAppender] creating stream " + streamName + " with partition count " + getPartitionCount());
+    StreamSpec streamSpec = StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, getPartitionCount());
+
+    // SystemAdmin only needed for stream creation here.
+    SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
+    systemAdmin.start();
+    systemAdmin.createStream(streamSpec);
+    systemAdmin.stop();
+
     systemProducer = systemFactory.getProducer(systemName, config, metricsRegistry);
     systemStream = new SystemStream(systemName, streamName);
     systemProducer.register(SOURCE);

http://git-wip-us.apache.org/repos/asf/samza/blob/810d8bd8/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemAdmin.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemAdmin.java
new file mode 100644
index 0000000..5c0e526
--- /dev/null
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemAdmin.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.StreamValidationException;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+public class MockSystemAdmin implements SystemAdmin {
+  public static String createdStreamName = "";
+
+  @Override
+  public void start() {
+
+  }
+
+  @Override
+  public void stop() {
+
+  }
+
+  @Override
+  public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+    return null;
+  }
+
+  @Override
+  public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+    return null;
+  }
+
+  @Override
+  public Integer offsetComparator(String offset1, String offset2) {
+    return null;
+  }
+
+  @Override
+  public boolean createStream(StreamSpec streamSpec) {
+    createdStreamName = streamSpec.getPhysicalName();
+    return true;
+  }
+
+  @Override
+  public void validateStream(StreamSpec streamSpec) throws StreamValidationException {
+
+  }
+
+  @Override
+  public boolean clearStream(StreamSpec streamSpec) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/810d8bd8/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java
index cdc1245..1d7e782 100644
--- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java
@@ -40,6 +40,6 @@ public class MockSystemFactory implements SystemFactory {
 
   @Override
   public SystemAdmin getAdmin(String systemName, Config config) {
-    return null;
+    return new MockSystemAdmin();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/810d8bd8/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
index d93c5d1..1257835 100644
--- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
@@ -37,6 +37,7 @@ import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerde;
 import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerde;
 import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestStreamAppender {
@@ -118,6 +119,40 @@ public class TestStreamAppender {
   }
 
   @Test
+  public void testStreamCreationUponSetup() {
+    System.setProperty("samza.container.name", "samza-container-1");
+
+    MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
+    PatternLayout layout = new PatternLayout();
+    layout.setConversionPattern("%m");
+    systemProducerAppender.setLayout(layout);
+    systemProducerAppender.activateOptions();
+    log.addAppender(systemProducerAppender);
+
+    systemProducerAppender.setupSystem();
+    Assert.assertEquals("__samza_log4jTest_1_logs", MockSystemAdmin.createdStreamName);
+  }
+
+  @Test
+  public void testDefaultPartitionCount() {
+    MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
+    Assert.assertEquals(1, systemProducerAppender.getPartitionCount()); // job.container.count defaults to 1
+
+    Map<String, String> map = new HashMap<>();
+    map.put("job.name", "log4jTest");
+    map.put("job.id", "1");
+    map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
+    map.put("task.log4j.system", "mock");
+    map.put("job.container.count", "4");
+    systemProducerAppender = new MockSystemProducerAppender(new MapConfig(map));
+    Assert.assertEquals(4, systemProducerAppender.getPartitionCount());
+
+    systemProducerAppender = new MockSystemProducerAppender();
+    systemProducerAppender.setPartitionCount(8);
+    Assert.assertEquals(8, systemProducerAppender.getPartitionCount());
+  }
+
+  @Test
   public void testExceptionsDoNotKillTransferThread() throws InterruptedException {
     System.setProperty("samza.container.name", "samza-container-1");