You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:30 UTC

[14/50] [abbrv] samza git commit: SAMZA-723 : hello-samza hangs when we use StreamAppender

SAMZA-723 : hello-samza hangs when we use StreamAppender


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8677a27f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8677a27f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8677a27f

Branch: refs/heads/samza-sql
Commit: 8677a27fd81895324bd99113929255ec56722390
Parents: 07c6984
Author: Yan Fang <ya...@gmail.com>
Authored: Tue Oct 27 15:21:41 2015 -0700
Committer: Navina <na...@gmail.com>
Committed: Tue Oct 27 15:21:41 2015 -0700

----------------------------------------------------------------------
 .../samza/coordinator/JobCoordinator.scala      |   8 +-
 .../apache/samza/config/Log4jSystemConfig.java  |   3 +-
 .../samza/logging/log4j/StreamAppender.java     | 108 +++++++++++--------
 .../samza/logging/log4j/TestStreamAppender.java |  39 ++++++-
 4 files changed, 112 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/8677a27f/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index a926ce6..03299cb 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -52,6 +52,11 @@ import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 object JobCoordinator extends Logging {
 
   /**
+   * a volatile value to store the current instantiated <code>JobCoordinator</code>
+   */
+  @volatile var currentJobCoordinator: JobCoordinator = null
+
+  /**
    * @param coordinatorSystemConfig A config object that contains job.name,
    * job.id, and all system.&lt;job-coordinator-system-name&gt;.*
    * configuration. The method will use this config to read all configuration
@@ -105,7 +110,8 @@ object JobCoordinator extends Logging {
     val jobModelGenerator = initializeJobModel(config, checkpointManager, changelogManager, localityManager, streamMetadataCache)
     val server = new HttpServer
     server.addServlet("/*", new JobServlet(jobModelGenerator))
-    new JobCoordinator(jobModelGenerator(), server, checkpointManager)
+    currentJobCoordinator = new JobCoordinator(jobModelGenerator(), server, checkpointManager)
+    currentJobCoordinator
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/8677a27f/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index d98b8c6..59015a9 100644
--- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -69,8 +69,7 @@ public class Log4jSystemConfig extends JavaSystemConfig {
   /**
    * Get the class name according to the serde name.
    * 
-   * @param name
-   *          serde name
+   * @param name serde name
    * @return serde factory name, or null if there is no factory defined for the
    *         supplied serde name.
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/8677a27f/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index 776a36b..0c6329e 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -22,13 +22,13 @@ package org.apache.samza.logging.log4j;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URL;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.Logger;
 import org.apache.log4j.spi.LoggingEvent;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.Log4jSystemConfig;
 import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.config.ShellCommandConfig;
@@ -63,16 +63,12 @@ public class StreamAppender extends AppenderSkeleton {
   private boolean isApplicationMaster = false;
   private Serde<LoggingEvent> serde = null;
   private Logger log = Logger.getLogger(StreamAppender.class);
+  protected static volatile boolean systemInitialized = false;
 
   /**
    * used to detect if this thread is called recursively
    */
-  private final ThreadLocal<Boolean> recursiveCall = new ThreadLocal<Boolean>() {
-    @Override
-    protected Boolean initialValue() {
-      return false;
-    }
-  };
+  private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
 
   public String getStreamName() {
     return this.streamName;
@@ -88,44 +84,37 @@ public class StreamAppender extends AppenderSkeleton {
     if (containerName != null) {
       isApplicationMaster = containerName.contains(APPLICATION_MASTER_TAG);
     } else {
-      throw new SamzaException("Got null container name from system property: " + JAVA_OPTS_CONTAINER_NAME + ". This is used as the key for the log appender, so can't proceed.");
+      throw new SamzaException("Got null container name from system property: " + JAVA_OPTS_CONTAINER_NAME +
+          ". This is used as the key for the log appender, so can't proceed.");
     }
     key = containerName; // use the container name as the key for the logs
-    config = getConfig();
-    SystemFactory systemFactory = null;
-    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
-
-    if (streamName == null) {
-      streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
-    }
-
-    String systemName = log4jSystemConfig.getSystemName();
-    String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName);
-    if (systemFactoryName != null) {
-      systemFactory = Util.<SystemFactory>getObj(systemFactoryName);
+    // StreamAppender has to wait until the JobCoordinator is up when the log is in the AM
+    if (isApplicationMaster) {
+      systemInitialized = false;
     } else {
-      throw new SamzaException("Could not figure out the \"" + systemName + "\" system factory for log4j StreamAppender to use");
+      setupSystem();
+      systemInitialized = true;
     }
-
-    setSerde(log4jSystemConfig, systemName, streamName);
-
-    systemProducer = systemFactory.getProducer(systemName, config, new MetricsRegistryMap());
-    systemStream = new SystemStream(systemName, streamName);
-    systemProducer.register(SOURCE);
-    systemProducer.start();
-
-    log.info(SOURCE + " has been registered in " + systemName + ". So all the logs will be sent to " + streamName
-        + " in " + systemName + ". Logs are partitioned by " + key);
   }
 
   @Override
-  protected void append(LoggingEvent event) {
+  public void append(LoggingEvent event) {
     if (!recursiveCall.get()) {
       try {
         recursiveCall.set(true);
-        OutgoingMessageEnvelope outgoingMessageEnvelope =
-            new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), serde.toBytes(subLog(event)));
-        systemProducer.send(SOURCE, outgoingMessageEnvelope);
+        if (!systemInitialized) {
+          if (JobCoordinator.currentJobCoordinator() != null) {
+            // JobCoordinator has been instantiated
+            setupSystem();
+            systemInitialized = true;
+          } else {
+            log.trace("Waiting for the JobCoordinator to be instantiated...");
+          }
+        } else {
+          OutgoingMessageEnvelope outgoingMessageEnvelope =
+              new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), serde.toBytes(subLog(event)));
+          systemProducer.send(SOURCE, outgoingMessageEnvelope);
+        }
       } catch (UnsupportedEncodingException e) {
         throw new SamzaException("can not send the log messages", e);
       } finally {
@@ -150,10 +139,13 @@ public class StreamAppender extends AppenderSkeleton {
 
   @Override
   public void close() {
+    log.info("Shutting down the StreamAppender...");
     if (!this.closed) {
       this.closed = true;
       flushSystemProducer();
-      systemProducer.stop();
+      if (systemProducer !=  null) {
+        systemProducer.stop();
+      }
     }
   }
 
@@ -166,7 +158,9 @@ public class StreamAppender extends AppenderSkeleton {
    * force the system producer to flush the messages
    */
   public void flushSystemProducer() {
-    systemProducer.flush(SOURCE);
+    if (systemProducer != null) {
+      systemProducer.flush(SOURCE);
+    }
   }
 
   /**
@@ -179,11 +173,12 @@ public class StreamAppender extends AppenderSkeleton {
 
     try {
       if (isApplicationMaster) {
-        Config coordinatorSystemConfig = new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG()), Config.class));
-        config = JobCoordinator.apply(coordinatorSystemConfig).jobModel().getConfig();
+        config = JobCoordinator.currentJobCoordinator().jobModel().getConfig();
       } else {
         String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
-        config = SamzaObjectMapper.getObjectMapper().readValue(Util.read(new URL(url), 30000), JobModel.class).getConfig();
+        config = SamzaObjectMapper.getObjectMapper()
+            .readValue(Util.read(new URL(url), 30000), JobModel.class)
+            .getConfig();
       }
     } catch (IOException e) {
       throw new SamzaException("can not read the config", e);
@@ -192,7 +187,35 @@ public class StreamAppender extends AppenderSkeleton {
     return config;
   }
 
-  public static String getStreamName(String jobName, String jobId) {
+  protected void setupSystem() {
+    config = getConfig();
+    SystemFactory systemFactory = null;
+    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
+
+    if (streamName == null) {
+      streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
+    }
+
+    String systemName = log4jSystemConfig.getSystemName();
+    String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName);
+    if (systemFactoryName != null) {
+      systemFactory = Util.<SystemFactory>getObj(systemFactoryName);
+    } else {
+      throw new SamzaException("Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use");
+    }
+
+    setSerde(log4jSystemConfig, systemName, streamName);
+
+    systemProducer = systemFactory.getProducer(systemName, config, new MetricsRegistryMap());
+    systemStream = new SystemStream(systemName, streamName);
+    systemProducer.register(SOURCE);
+    systemProducer.start();
+
+    log.info(SOURCE + " has been registered in " + systemName + ". So all the logs will be sent to " + streamName
+        + " in " + systemName + ". Logs are partitioned by " + key);
+  }
+
+  protected static String getStreamName(String jobName, String jobId) {
     if (jobName == null) {
       throw new SamzaException("job name is null. Please specify job.name");
     }
@@ -224,7 +247,8 @@ public class StreamAppender extends AppenderSkeleton {
       serde = serdeFactory.getSerde(systemName, config);
     } else {
       String serdeKey = String.format(SerializerConfig.SERDE(), serdeName);
-      throw new SamzaException("Can not find serializers class for key '" + serdeName + "'. Please specify " + serdeKey + " property");
+      throw new SamzaException("Can not find serializers class for key '" + serdeName + "'. Please specify " +
+          serdeKey + " property");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/8677a27f/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
index 1c6f9a4..e2e17a0 100644
--- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
@@ -70,7 +70,7 @@ public class TestStreamAppender {
   }
 
   @Test
-  public void testSystemProducerAppender() {
+  public void testSystemProducerAppenderInContainer() {
     System.setProperty("samza.container.name", "samza-container-1");
 
     MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
@@ -87,6 +87,43 @@ public class TestStreamAppender {
     assertEquals(2, MockSystemProducer.messagesReceived.size());
     assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(0)).contains("\"message\":\"testing\""));
     assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(1)).contains("\"message\":\"testing2\""));
+
+    // reset
+    log.removeAllAppenders();
+    MockSystemProducer.messagesReceived.clear();
+  }
+
+  @Test
+  public void testSystemProducerAppenderInAM() {
+    System.setProperty("samza.container.name", "samza-application-master");
+
+    MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
+    PatternLayout layout = new PatternLayout();
+    layout.setConversionPattern("%m");
+    systemProducerAppender.setLayout(layout);
+    systemProducerAppender.activateOptions();
+    log.addAppender(systemProducerAppender);
+
+    log.info("no-received");
+    systemProducerAppender.flushSystemProducer();
+    // it should not receive anything because the system is not setup
+    assertEquals(0, MockSystemProducer.messagesReceived.size());
+
+    systemProducerAppender.setupSystem();
+    MockSystemProducerAppender.systemInitialized = true;
+
+    log.info("testing3");
+    log.info("testing4");
+    systemProducerAppender.flushSystemProducer();
+
+    // be able to received msgs now
+    assertEquals(2, MockSystemProducer.messagesReceived.size());
+    assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(0)).contains("\"message\":\"testing3\""));
+    assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(1)).contains("\"message\":\"testing4\""));
+
+    // reset
+    log.removeAllAppenders();
+    MockSystemProducer.messagesReceived.clear();
   }
 
   /**