You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/02/24 18:47:54 UTC
[5/7] samza git commit: SAMZA-479: make StreamAppender pluggable for
different log formats
SAMZA-479: make StreamAppender pluggable for different log formats
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/11082d34
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/11082d34
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/11082d34
Branch: refs/heads/samza-sql
Commit: 11082d344d3059039fd285c0ef6752de02546977
Parents: 9ea3a52
Author: Yan Fang <ya...@gmail.com>
Authored: Wed Feb 18 18:11:37 2015 -0800
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed Feb 18 18:11:37 2015 -0800
----------------------------------------------------------------------
.../apache/samza/config/Log4jSystemConfig.java | 27 ++++++++
.../samza/logging/log4j/StreamAppender.java | 45 +++++++++++-
.../serializers/LoggingEventStringSerde.java | 72 ++++++++++++++++++++
.../LoggingEventStringSerdeFactory.java | 32 +++++++++
.../samza/config/TestLog4jSystemConfig.java | 23 +++++++
.../samza/logging/log4j/TestStreamAppender.java | 3 +
.../TestLoggingEventStringSerde.java | 44 ++++++++++++
7 files changed, 245 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index 659e3b6..107ddf0 100644
--- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -22,6 +22,8 @@ package org.apache.samza.config;
import java.util.ArrayList;
import java.util.Map;
+import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
+
/**
* This class contains the methods for getting properties that are needed by the
* StreamAppender.
@@ -74,6 +76,31 @@ public class Log4jSystemConfig {
}
/**
+ * get the class name according to the serde name. If the serde name is "log4j" and
+ * the serde class is not configured, will use the default {@link LoggingEventStringSerdeFactory}
+ *
+ * @param name serde name
+ * @return serde factory name
+ */
+ public String getSerdeClass(String name) {
+ String className = getValue(String.format(SerializerConfig.SERDE(), name));
+ if (className == null && name.equals("log4j")) {
+ className = LoggingEventStringSerdeFactory.class.getCanonicalName();
+ }
+ return className;
+ }
+
+ public String getSystemSerdeName(String name) {
+ String systemSerdeNameConfig = String.format(SystemConfig.MSG_SERDE(), name);
+ return getValue(systemSerdeNameConfig);
+ }
+
+ public String getStreamSerdeName(String systemName, String streamName) {
+ String streamSerdeNameConfig = String.format(StreamConfig.MSG_SERDE(), systemName, streamName);
+ return getValue(streamSerdeNameConfig);
+ }
+
+ /**
* A helper method to get the value from the config. If the config does not
* contain the key, return null.
*
http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index 9a9d648..4ef3551 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -30,8 +30,12 @@ import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.Log4jSystemConfig;
import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.config.SystemConfig;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemFactory;
@@ -55,6 +59,7 @@ public class StreamAppender extends AppenderSkeleton {
private String key = null;
private String streamName = null;
private boolean isApplicationMaster = false;
+ private Serde<LoggingEvent> serde = null;
private Logger log = Logger.getLogger(StreamAppender.class);
/**
@@ -96,6 +101,8 @@ public class StreamAppender extends AppenderSkeleton {
throw new SamzaException("Please define log4j system name and factory class");
}
+ setSerde(log4jSystemConfig, systemName, streamName);
+
systemProducer = systemFactory.getProducer(systemName, config, new MetricsRegistryMap());
systemStream = new SystemStream(systemName, streamName);
systemProducer.register(SOURCE);
@@ -111,7 +118,7 @@ public class StreamAppender extends AppenderSkeleton {
try {
recursiveCall.set(true);
OutgoingMessageEnvelope outgoingMessageEnvelope =
- new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), subAppend(event).getBytes("UTF-8"));
+ new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), serde.toBytes(subLog(event)));
systemProducer.send(SOURCE, outgoingMessageEnvelope);
} catch (UnsupportedEncodingException e) {
throw new SamzaException("can not send the log messages", e);
@@ -129,6 +136,12 @@ public class StreamAppender extends AppenderSkeleton {
}
}
+ private LoggingEvent subLog(LoggingEvent event) {
+ return new LoggingEvent(event.getFQNOfLoggerClass(), event.getLogger(), event.getTimeStamp(),
+ event.getLevel(), subAppend(event), event.getThreadName(), event.getThrowableInformation(),
+ event.getNDC(), event.getLocationInformation(), event.getProperties());
+ }
+
@Override
public void close() {
if (!this.closed) {
@@ -182,4 +195,34 @@ public class StreamAppender extends AppenderSkeleton {
String streamName = "__samza_" + jobName + "_" + jobId + "_logs";
return streamName.replace("-", "_");
}
+
+ /**
+ * set the serde for this appender. It looks for the stream serde first, then system serde.
+ * If still can not get the serde, throws exceptions.
+ *
+ * @param log4jSystemConfig log4jSystemConfig for this appender
+ * @param systemName name of the system
+ * @param streamName name of the stream
+ */
+ private void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName, String streamName) {
+ String serdeClass = null;
+ String serdeName = log4jSystemConfig.getStreamSerdeName(systemName, streamName);
+
+ if (serdeName == null) {
+ serdeName = log4jSystemConfig.getSystemSerdeName(systemName);
+ }
+
+ if (serdeName == null) {
+ throw new SamzaException("Missing serde name. Please specify the " + StreamConfig.MSG_SERDE() + " or " + SystemConfig.MSG_SERDE() + " property.");
+ }
+
+ serdeClass = log4jSystemConfig.getSerdeClass(serdeName);
+
+ if (serdeClass != null) {
+ SerdeFactory<LoggingEvent> serdeFactory = Util.<SerdeFactory<LoggingEvent>> getObj(serdeClass);
+ serde = serdeFactory.getSerde(systemName, config);
+ } else {
+ throw new SamzaException("Can not find serializers class. Please specify serializers.registry.s%.class property");
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
new file mode 100644
index 0000000..f9a0960
--- /dev/null
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j.serializers;
+
+import java.io.UnsupportedEncodingException;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A serializer for LoggingEvent. It provides two methods. {@link #toBytes(LoggingEvent object)} serializes
+ * the {@link @LoggingEvent}'s messages into bytes. {@link #fromBytes(byte[] bytes)} will creates a new
+ * LoggingEvent based on the messages, which is deserialized from the bytes.
+ */
+public class LoggingEventStringSerde implements Serde<LoggingEvent> {
+ final private String ENCODING = "UTF-8";
+ final Logger logger = Logger.getLogger(LoggingEventStringSerde.class);
+
+ @Override
+ public byte[] toBytes(LoggingEvent object) {
+ byte[] bytes = null;
+ if (object != null) {
+ try {
+ bytes = object.getMessage().toString().getBytes(ENCODING);
+ } catch (UnsupportedEncodingException e) {
+ throw new SamzaException("can not be encoded to byte[]", e);
+ }
+ }
+ return bytes;
+ }
+
+ /**
+ * Convert bytes to a {@link LoggingEvent}. This LoggingEvent uses logging
+ * information of the {@link LoggingEventStringSerde}, which includes log
+ * name, log category and log level.
+ *
+ * @param bytes bytes for decoding
+ * @return LoggingEvent a new LoggingEvent
+ */
+ @Override
+ public LoggingEvent fromBytes(byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+ String log;
+ try {
+ log = new String(bytes, ENCODING);
+ } catch (UnsupportedEncodingException e) {
+ throw new SamzaException("can not decode to String", e);
+ }
+ return new LoggingEvent(logger.getName(), logger, logger.getLevel(), log, null);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerdeFactory.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerdeFactory.java
new file mode 100644
index 0000000..150c3e5
--- /dev/null
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerdeFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j.serializers;
+
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+
+public class LoggingEventStringSerdeFactory implements SerdeFactory<LoggingEvent> {
+ @Override
+ public Serde<LoggingEvent> getSerde(String name, Config config) {
+ return new LoggingEventStringSerde();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
index 64a1e70..16ccb45 100644
--- a/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
+++ b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.*;
import java.util.HashMap;
import java.util.Map;
+import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -64,4 +65,26 @@ public class TestLog4jSystemConfig {
exception.expect(ConfigException.class);
log4jSystemConfig.getSystemName();
}
+
+ @Test
+ public void testGetSerdeClass() {
+ Map<String, String> map = new HashMap<String, String>();
+ Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map));
+
+ // get the default serde
+ assertEquals(LoggingEventStringSerdeFactory.class.getCanonicalName(), log4jSystemConfig.getSerdeClass("log4j"));
+ // get null
+ assertNull(log4jSystemConfig.getSerdeClass("otherName"));
+ }
+
+ @Test
+ public void testGetSerdeName() {
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("systems.mockSystem.streams.mockStream.samza.msg.serde", "streamSerde");
+ map.put("systems.mockSystem.samza.msg.serde", "systemSerde");
+ Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map));
+
+ assertEquals("streamSerde", log4jSystemConfig.getStreamSerdeName("mockSystem", "mockStream"));
+ assertEquals("systemSerde", log4jSystemConfig.getSystemSerdeName("mockSystem"));
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
index 46e4b8c..3e4ddc9 100644
--- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
@@ -23,10 +23,12 @@ import static org.junit.Assert.*;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
import org.junit.Test;
public class TestStreamAppender {
@@ -64,6 +66,7 @@ public class TestStreamAppender {
Map<String, String> map = new HashMap<String, String>();
map.put("job.name", "log4jTest");
map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
+ map.put("systems.mock.samza.msg.serde", "log4j");
return new MapConfig(map);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/test/java/org/apache/samza/logging/log4j/serializers/TestLoggingEventStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/serializers/TestLoggingEventStringSerde.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/serializers/TestLoggingEventStringSerde.java
new file mode 100644
index 0000000..4a7aa68
--- /dev/null
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/serializers/TestLoggingEventStringSerde.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j.serializers;
+
+import static org.junit.Assert.*;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Test;
+
+public class TestLoggingEventStringSerde {
+
+ @Test
+ public void test() {
+ String testLog = "testing";
+ Logger logger = Logger.getLogger(TestLoggingEventStringSerde.class);
+ LoggingEvent log = new LoggingEvent(logger.getName(), logger, logger.getLevel(), testLog, null);
+ LoggingEventStringSerde loggingEventStringSerde = new LoggingEventStringSerde();
+
+ assertNull(loggingEventStringSerde.fromBytes(null));
+ assertNull(loggingEventStringSerde.toBytes(null));
+
+ assertArrayEquals(testLog.getBytes(), loggingEventStringSerde.toBytes(log));
+ // only the log messages are guaranteed to be equivalent
+ assertEquals(log.getMessage().toString(), loggingEventStringSerde.fromBytes(testLog.getBytes()).getMessage().toString());
+ }
+}