You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/02/24 18:47:54 UTC

[5/7] samza git commit: SAMZA-479: make StreamAppender pluggable for different log formats

SAMZA-479: make StreamAppender pluggable for different log formats


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/11082d34
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/11082d34
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/11082d34

Branch: refs/heads/samza-sql
Commit: 11082d344d3059039fd285c0ef6752de02546977
Parents: 9ea3a52
Author: Yan Fang <ya...@gmail.com>
Authored: Wed Feb 18 18:11:37 2015 -0800
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed Feb 18 18:11:37 2015 -0800

----------------------------------------------------------------------
 .../apache/samza/config/Log4jSystemConfig.java  | 27 ++++++++
 .../samza/logging/log4j/StreamAppender.java     | 45 +++++++++++-
 .../serializers/LoggingEventStringSerde.java    | 72 ++++++++++++++++++++
 .../LoggingEventStringSerdeFactory.java         | 32 +++++++++
 .../samza/config/TestLog4jSystemConfig.java     | 23 +++++++
 .../samza/logging/log4j/TestStreamAppender.java |  3 +
 .../TestLoggingEventStringSerde.java            | 44 ++++++++++++
 7 files changed, 245 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index 659e3b6..107ddf0 100644
--- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -22,6 +22,8 @@ package org.apache.samza.config;
 import java.util.ArrayList;
 import java.util.Map;
 
+import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
+
 /**
  * This class contains the methods for getting properties that are needed by the
  * StreamAppender.
@@ -74,6 +76,31 @@ public class Log4jSystemConfig {
   }
 
   /**
+   * get the class name according to the serde name. If the serde name is "log4j" and
+   * the serde class is not configured, will use the default {@link LoggingEventStringSerdeFactory}
+   * 
+   * @param name serde name
+   * @return serde factory name
+   */
+  public String getSerdeClass(String name) {
+    String className = getValue(String.format(SerializerConfig.SERDE(), name));
+    if (className == null && name.equals("log4j")) {
+      className = LoggingEventStringSerdeFactory.class.getCanonicalName();
+    }
+    return className;
+  }
+
+  public String getSystemSerdeName(String name) {
+    String systemSerdeNameConfig = String.format(SystemConfig.MSG_SERDE(), name);
+    return getValue(systemSerdeNameConfig);
+  }
+
+  public String getStreamSerdeName(String systemName, String streamName) {
+    String streamSerdeNameConfig = String.format(StreamConfig.MSG_SERDE(), systemName, streamName);
+    return getValue(streamSerdeNameConfig);
+  }
+
+  /**
    * A helper method to get the value from the config. If the config does not
    * contain the key, return null.
    *

http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index 9a9d648..4ef3551 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -30,8 +30,12 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.Log4jSystemConfig;
 import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.config.SystemConfig;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemFactory;
@@ -55,6 +59,7 @@ public class StreamAppender extends AppenderSkeleton {
   private String key = null;
   private String streamName = null;
   private boolean isApplicationMaster = false;
+  private Serde<LoggingEvent> serde = null;
   private Logger log = Logger.getLogger(StreamAppender.class);
 
   /**
@@ -96,6 +101,8 @@ public class StreamAppender extends AppenderSkeleton {
       throw new SamzaException("Please define log4j system name and factory class");
     }
 
+    setSerde(log4jSystemConfig, systemName, streamName);
+
     systemProducer = systemFactory.getProducer(systemName, config, new MetricsRegistryMap());
     systemStream = new SystemStream(systemName, streamName);
     systemProducer.register(SOURCE);
@@ -111,7 +118,7 @@ public class StreamAppender extends AppenderSkeleton {
       try {
         recursiveCall.set(true);
         OutgoingMessageEnvelope outgoingMessageEnvelope =
-            new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), subAppend(event).getBytes("UTF-8"));
+            new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), serde.toBytes(subLog(event)));
         systemProducer.send(SOURCE, outgoingMessageEnvelope);
       } catch (UnsupportedEncodingException e) {
         throw new SamzaException("can not send the log messages", e);
@@ -129,6 +136,12 @@ public class StreamAppender extends AppenderSkeleton {
     }
   }
 
+  private LoggingEvent subLog(LoggingEvent event) {
+    return new LoggingEvent(event.getFQNOfLoggerClass(), event.getLogger(), event.getTimeStamp(),
+        event.getLevel(), subAppend(event), event.getThreadName(), event.getThrowableInformation(),
+        event.getNDC(), event.getLocationInformation(), event.getProperties());
+  }
+
   @Override
   public void close() {
     if (!this.closed) {
@@ -182,4 +195,34 @@ public class StreamAppender extends AppenderSkeleton {
     String streamName = "__samza_" + jobName + "_" + jobId + "_logs";
     return streamName.replace("-", "_");
   }
+
+  /**
+   * set the serde for this appender. It looks for the stream serde first, then system serde.
+   * If still can not get the serde, throws exceptions. 
+   *
+   * @param log4jSystemConfig log4jSystemConfig for this appender
+   * @param systemName name of the system
+   * @param streamName name of the stream
+   */
+  private void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName, String streamName) {
+    String serdeClass = null;
+    String serdeName = log4jSystemConfig.getStreamSerdeName(systemName, streamName);
+
+    if (serdeName == null) {
+      serdeName = log4jSystemConfig.getSystemSerdeName(systemName);
+    }
+
+    if (serdeName == null) {
+      throw new SamzaException("Missing serde name. Please specify the " + StreamConfig.MSG_SERDE() + " or " + SystemConfig.MSG_SERDE() + " property.");
+    }
+
+    serdeClass = log4jSystemConfig.getSerdeClass(serdeName);
+
+    if (serdeClass != null) {
+      SerdeFactory<LoggingEvent> serdeFactory = Util.<SerdeFactory<LoggingEvent>> getObj(serdeClass);
+      serde = serdeFactory.getSerde(systemName, config);
+    } else {
+      throw new SamzaException("Can not find serializers class. Please specify serializers.registry.s%.class property");
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
new file mode 100644
index 0000000..f9a0960
--- /dev/null
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerde.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j.serializers;
+
+import java.io.UnsupportedEncodingException;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A serializer for LoggingEvent. It provides two methods. {@link #toBytes(LoggingEvent object)} serializes
+ * the {@link @LoggingEvent}'s messages into bytes. {@link #fromBytes(byte[] bytes)} will creates a new
+ * LoggingEvent based on the messages, which is deserialized from the bytes.
+ */
+public class LoggingEventStringSerde implements Serde<LoggingEvent> {
+  final private String ENCODING = "UTF-8";
+  final Logger logger = Logger.getLogger(LoggingEventStringSerde.class);
+
+  @Override
+  public byte[] toBytes(LoggingEvent object) {
+    byte[] bytes = null;
+    if (object != null) {
+      try {
+        bytes = object.getMessage().toString().getBytes(ENCODING);
+      } catch (UnsupportedEncodingException e) {
+        throw new SamzaException("can not be encoded to byte[]", e);
+      }
+    }
+    return bytes;
+  }
+
+  /**
+   * Convert bytes to a {@link LoggingEvent}. This LoggingEvent uses logging
+   * information of the {@link LoggingEventStringSerde}, which includes log
+   * name, log category and log level.
+   *
+   * @param bytes bytes for decoding
+   * @return LoggingEvent a new LoggingEvent
+   */
+  @Override
+  public LoggingEvent fromBytes(byte[] bytes) {
+    if (bytes == null) {
+      return null;
+    }
+    String log;
+    try {
+      log = new String(bytes, ENCODING);
+    } catch (UnsupportedEncodingException e) {
+      throw new SamzaException("can not decode to String", e);
+    }
+    return new LoggingEvent(logger.getName(), logger, logger.getLevel(), log, null);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerdeFactory.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerdeFactory.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerdeFactory.java
new file mode 100644
index 0000000..150c3e5
--- /dev/null
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventStringSerdeFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j.serializers;
+
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+
+public class LoggingEventStringSerdeFactory implements SerdeFactory<LoggingEvent> {
+  @Override
+  public Serde<LoggingEvent> getSerde(String name, Config config) {
+    return new LoggingEventStringSerde();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
index 64a1e70..16ccb45 100644
--- a/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
+++ b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.*;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -64,4 +65,26 @@ public class TestLog4jSystemConfig {
     exception.expect(ConfigException.class);
     log4jSystemConfig.getSystemName();
   }
+
+  @Test
+  public void testGetSerdeClass() {
+    Map<String, String> map = new HashMap<String, String>();
+    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map));
+
+    // get the default serde
+    assertEquals(LoggingEventStringSerdeFactory.class.getCanonicalName(), log4jSystemConfig.getSerdeClass("log4j"));
+    // get null
+    assertNull(log4jSystemConfig.getSerdeClass("otherName"));
+  }
+
+  @Test
+  public void testGetSerdeName() {
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("systems.mockSystem.streams.mockStream.samza.msg.serde", "streamSerde");
+    map.put("systems.mockSystem.samza.msg.serde", "systemSerde");
+    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map));
+
+    assertEquals("streamSerde", log4jSystemConfig.getStreamSerdeName("mockSystem", "mockStream"));
+    assertEquals("systemSerde", log4jSystemConfig.getSystemSerdeName("mockSystem"));
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
index 46e4b8c..3e4ddc9 100644
--- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
@@ -23,10 +23,12 @@ import static org.junit.Assert.*;
 
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
 import org.junit.Test;
 
 public class TestStreamAppender {
@@ -64,6 +66,7 @@ public class TestStreamAppender {
       Map<String, String> map = new HashMap<String, String>();
       map.put("job.name", "log4jTest");
       map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
+      map.put("systems.mock.samza.msg.serde", "log4j");
       return new MapConfig(map);
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/11082d34/samza-log4j/src/test/java/org/apache/samza/logging/log4j/serializers/TestLoggingEventStringSerde.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/serializers/TestLoggingEventStringSerde.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/serializers/TestLoggingEventStringSerde.java
new file mode 100644
index 0000000..4a7aa68
--- /dev/null
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/serializers/TestLoggingEventStringSerde.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j.serializers;
+
+import static org.junit.Assert.*;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Test;
+
+public class TestLoggingEventStringSerde {
+
+  @Test
+  public void test() {
+    String testLog = "testing";
+    Logger logger = Logger.getLogger(TestLoggingEventStringSerde.class);
+    LoggingEvent log = new LoggingEvent(logger.getName(), logger, logger.getLevel(), testLog, null);
+    LoggingEventStringSerde loggingEventStringSerde = new LoggingEventStringSerde();
+
+    assertNull(loggingEventStringSerde.fromBytes(null));
+    assertNull(loggingEventStringSerde.toBytes(null));
+
+    assertArrayEquals(testLog.getBytes(), loggingEventStringSerde.toBytes(log));
+    // only the log messages are guaranteed to be equivalent 
+    assertEquals(log.getMessage().toString(), loggingEventStringSerde.fromBytes(testLog.getBytes()).getMessage().toString());
+  }
+}