You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:30 UTC
[14/50] [abbrv] samza git commit: SAMZA-723 : hello-samza hangs when
we use StreamAppender
SAMZA-723 : hello-samza hangs when we use StreamAppender
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8677a27f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8677a27f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8677a27f
Branch: refs/heads/samza-sql
Commit: 8677a27fd81895324bd99113929255ec56722390
Parents: 07c6984
Author: Yan Fang <ya...@gmail.com>
Authored: Tue Oct 27 15:21:41 2015 -0700
Committer: Navina <na...@gmail.com>
Committed: Tue Oct 27 15:21:41 2015 -0700
----------------------------------------------------------------------
.../samza/coordinator/JobCoordinator.scala | 8 +-
.../apache/samza/config/Log4jSystemConfig.java | 3 +-
.../samza/logging/log4j/StreamAppender.java | 108 +++++++++++--------
.../samza/logging/log4j/TestStreamAppender.java | 39 ++++++-
4 files changed, 112 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/8677a27f/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index a926ce6..03299cb 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -52,6 +52,11 @@ import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
object JobCoordinator extends Logging {
/**
+ * a volatile value to store the current instantiated <code>JobCoordinator</code>
+ */
+ @volatile var currentJobCoordinator: JobCoordinator = null
+
+ /**
* @param coordinatorSystemConfig A config object that contains job.name,
* job.id, and all system.<job-coordinator-system-name>.*
* configuration. The method will use this config to read all configuration
@@ -105,7 +110,8 @@ object JobCoordinator extends Logging {
val jobModelGenerator = initializeJobModel(config, checkpointManager, changelogManager, localityManager, streamMetadataCache)
val server = new HttpServer
server.addServlet("/*", new JobServlet(jobModelGenerator))
- new JobCoordinator(jobModelGenerator(), server, checkpointManager)
+ currentJobCoordinator = new JobCoordinator(jobModelGenerator(), server, checkpointManager)
+ currentJobCoordinator
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/8677a27f/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index d98b8c6..59015a9 100644
--- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -69,8 +69,7 @@ public class Log4jSystemConfig extends JavaSystemConfig {
/**
* Get the class name according to the serde name.
*
- * @param name
- * serde name
+ * @param name serde name
* @return serde factory name, or null if there is no factory defined for the
* supplied serde name.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/8677a27f/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index 776a36b..0c6329e 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -22,13 +22,13 @@ package org.apache.samza.logging.log4j;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URL;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
import org.apache.samza.config.Log4jSystemConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.ShellCommandConfig;
@@ -63,16 +63,12 @@ public class StreamAppender extends AppenderSkeleton {
private boolean isApplicationMaster = false;
private Serde<LoggingEvent> serde = null;
private Logger log = Logger.getLogger(StreamAppender.class);
+ protected static volatile boolean systemInitialized = false;
/**
* used to detect if this thread is called recursively
*/
- private final ThreadLocal<Boolean> recursiveCall = new ThreadLocal<Boolean>() {
- @Override
- protected Boolean initialValue() {
- return false;
- }
- };
+ private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
public String getStreamName() {
return this.streamName;
@@ -88,44 +84,37 @@ public class StreamAppender extends AppenderSkeleton {
if (containerName != null) {
isApplicationMaster = containerName.contains(APPLICATION_MASTER_TAG);
} else {
- throw new SamzaException("Got null container name from system property: " + JAVA_OPTS_CONTAINER_NAME + ". This is used as the key for the log appender, so can't proceed.");
+ throw new SamzaException("Got null container name from system property: " + JAVA_OPTS_CONTAINER_NAME +
+ ". This is used as the key for the log appender, so can't proceed.");
}
key = containerName; // use the container name as the key for the logs
- config = getConfig();
- SystemFactory systemFactory = null;
- Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
-
- if (streamName == null) {
- streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
- }
-
- String systemName = log4jSystemConfig.getSystemName();
- String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName);
- if (systemFactoryName != null) {
- systemFactory = Util.<SystemFactory>getObj(systemFactoryName);
+ // StreamAppender has to wait until the JobCoordinator is up when the log is in the AM
+ if (isApplicationMaster) {
+ systemInitialized = false;
} else {
- throw new SamzaException("Could not figure out the \"" + systemName + "\" system factory for log4j StreamAppender to use");
+ setupSystem();
+ systemInitialized = true;
}
-
- setSerde(log4jSystemConfig, systemName, streamName);
-
- systemProducer = systemFactory.getProducer(systemName, config, new MetricsRegistryMap());
- systemStream = new SystemStream(systemName, streamName);
- systemProducer.register(SOURCE);
- systemProducer.start();
-
- log.info(SOURCE + " has been registered in " + systemName + ". So all the logs will be sent to " + streamName
- + " in " + systemName + ". Logs are partitioned by " + key);
}
@Override
- protected void append(LoggingEvent event) {
+ public void append(LoggingEvent event) {
if (!recursiveCall.get()) {
try {
recursiveCall.set(true);
- OutgoingMessageEnvelope outgoingMessageEnvelope =
- new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), serde.toBytes(subLog(event)));
- systemProducer.send(SOURCE, outgoingMessageEnvelope);
+ if (!systemInitialized) {
+ if (JobCoordinator.currentJobCoordinator() != null) {
+ // JobCoordinator has been instantiated
+ setupSystem();
+ systemInitialized = true;
+ } else {
+ log.trace("Waiting for the JobCoordinator to be instantiated...");
+ }
+ } else {
+ OutgoingMessageEnvelope outgoingMessageEnvelope =
+ new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), serde.toBytes(subLog(event)));
+ systemProducer.send(SOURCE, outgoingMessageEnvelope);
+ }
} catch (UnsupportedEncodingException e) {
throw new SamzaException("can not send the log messages", e);
} finally {
@@ -150,10 +139,13 @@ public class StreamAppender extends AppenderSkeleton {
@Override
public void close() {
+ log.info("Shutting down the StreamAppender...");
if (!this.closed) {
this.closed = true;
flushSystemProducer();
- systemProducer.stop();
+ if (systemProducer != null) {
+ systemProducer.stop();
+ }
}
}
@@ -166,7 +158,9 @@ public class StreamAppender extends AppenderSkeleton {
* force the system producer to flush the messages
*/
public void flushSystemProducer() {
- systemProducer.flush(SOURCE);
+ if (systemProducer != null) {
+ systemProducer.flush(SOURCE);
+ }
}
/**
@@ -179,11 +173,12 @@ public class StreamAppender extends AppenderSkeleton {
try {
if (isApplicationMaster) {
- Config coordinatorSystemConfig = new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG()), Config.class));
- config = JobCoordinator.apply(coordinatorSystemConfig).jobModel().getConfig();
+ config = JobCoordinator.currentJobCoordinator().jobModel().getConfig();
} else {
String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
- config = SamzaObjectMapper.getObjectMapper().readValue(Util.read(new URL(url), 30000), JobModel.class).getConfig();
+ config = SamzaObjectMapper.getObjectMapper()
+ .readValue(Util.read(new URL(url), 30000), JobModel.class)
+ .getConfig();
}
} catch (IOException e) {
throw new SamzaException("can not read the config", e);
@@ -192,7 +187,35 @@ public class StreamAppender extends AppenderSkeleton {
return config;
}
- public static String getStreamName(String jobName, String jobId) {
+ protected void setupSystem() {
+ config = getConfig();
+ SystemFactory systemFactory = null;
+ Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
+
+ if (streamName == null) {
+ streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
+ }
+
+ String systemName = log4jSystemConfig.getSystemName();
+ String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName);
+ if (systemFactoryName != null) {
+ systemFactory = Util.<SystemFactory>getObj(systemFactoryName);
+ } else {
+ throw new SamzaException("Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use");
+ }
+
+ setSerde(log4jSystemConfig, systemName, streamName);
+
+ systemProducer = systemFactory.getProducer(systemName, config, new MetricsRegistryMap());
+ systemStream = new SystemStream(systemName, streamName);
+ systemProducer.register(SOURCE);
+ systemProducer.start();
+
+ log.info(SOURCE + " has been registered in " + systemName + ". So all the logs will be sent to " + streamName
+ + " in " + systemName + ". Logs are partitioned by " + key);
+ }
+
+ protected static String getStreamName(String jobName, String jobId) {
if (jobName == null) {
throw new SamzaException("job name is null. Please specify job.name");
}
@@ -224,7 +247,8 @@ public class StreamAppender extends AppenderSkeleton {
serde = serdeFactory.getSerde(systemName, config);
} else {
String serdeKey = String.format(SerializerConfig.SERDE(), serdeName);
- throw new SamzaException("Can not find serializers class for key '" + serdeName + "'. Please specify " + serdeKey + " property");
+ throw new SamzaException("Can not find serializers class for key '" + serdeName + "'. Please specify " +
+ serdeKey + " property");
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/8677a27f/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
index 1c6f9a4..e2e17a0 100644
--- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
@@ -70,7 +70,7 @@ public class TestStreamAppender {
}
@Test
- public void testSystemProducerAppender() {
+ public void testSystemProducerAppenderInContainer() {
System.setProperty("samza.container.name", "samza-container-1");
MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
@@ -87,6 +87,43 @@ public class TestStreamAppender {
assertEquals(2, MockSystemProducer.messagesReceived.size());
assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(0)).contains("\"message\":\"testing\""));
assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(1)).contains("\"message\":\"testing2\""));
+
+ // reset
+ log.removeAllAppenders();
+ MockSystemProducer.messagesReceived.clear();
+ }
+
+ @Test
+ public void testSystemProducerAppenderInAM() {
+ System.setProperty("samza.container.name", "samza-application-master");
+
+ MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender();
+ PatternLayout layout = new PatternLayout();
+ layout.setConversionPattern("%m");
+ systemProducerAppender.setLayout(layout);
+ systemProducerAppender.activateOptions();
+ log.addAppender(systemProducerAppender);
+
+ log.info("no-received");
+ systemProducerAppender.flushSystemProducer();
+ // it should not receive anything because the system is not setup
+ assertEquals(0, MockSystemProducer.messagesReceived.size());
+
+ systemProducerAppender.setupSystem();
+ MockSystemProducerAppender.systemInitialized = true;
+
+ log.info("testing3");
+ log.info("testing4");
+ systemProducerAppender.flushSystemProducer();
+
+ // be able to received msgs now
+ assertEquals(2, MockSystemProducer.messagesReceived.size());
+ assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(0)).contains("\"message\":\"testing3\""));
+ assertTrue(new String((byte[]) MockSystemProducer.messagesReceived.get(1)).contains("\"message\":\"testing4\""));
+
+ // reset
+ log.removeAllAppenders();
+ MockSystemProducer.messagesReceived.clear();
}
/**