You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by de...@apache.org on 2017/08/21 10:12:17 UTC

flume git commit: FLUME-3144. Improve Log4jAppender's performance by allowing logging collection of messages

Repository: flume
Updated Branches:
  refs/heads/trunk 1fcef1a3b -> 66327aa20


FLUME-3144. Improve Log4jAppender's performance by allowing logging collection of messages

Log4jAppender treats Collection messages as a special case making it possible to log
Collection of events in one Log4j log call. The appender sends these events to the
receiving Flume instance as one batch with the rpcClient.appendBatch() method.

This closes #151

Reviewers: Ferenc Szabo, Miklos Csanady

(Denes Arvay via Denes Arvay)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/66327aa2
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/66327aa2
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/66327aa2

Branch: refs/heads/trunk
Commit: 66327aa20207ca62cd7c7a4cf9e24724b5ee0dfc
Parents: 1fcef1a
Author: Denes Arvay <de...@apache.org>
Authored: Fri Aug 11 16:22:04 2017 +0200
Committer: Denes Arvay <de...@apache.org>
Committed: Mon Aug 21 11:10:13 2017 +0200

----------------------------------------------------------------------
 flume-ng-clients/flume-ng-log4jappender/pom.xml |   6 ++
 .../clients/log4jappender/Log4jAppender.java    | 107 +++++++++++++------
 .../log4jappender/TestLog4jAppender.java        |  74 +++++++++++--
 .../TestLog4jAppenderWithAvro.java              | 102 ++++++++++++++++--
 4 files changed, 241 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/66327aa2/flume-ng-clients/flume-ng-log4jappender/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/pom.xml b/flume-ng-clients/flume-ng-log4jappender/pom.xml
index 610872a..92fe258 100644
--- a/flume-ng-clients/flume-ng-log4jappender/pom.xml
+++ b/flume-ng-clients/flume-ng-log4jappender/pom.xml
@@ -87,6 +87,12 @@ limitations under the License.
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
 

http://git-wip-us.apache.org/repos/asf/flume/blob/66327aa2/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
index 46b05e9..f5abfbc 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
@@ -23,7 +23,11 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -90,7 +94,7 @@ public class Log4jAppender extends AppenderSkeleton {
    * you must set the <tt>port</tt> and <tt>hostname</tt> and then call
    * <tt>activateOptions()</tt> before calling <tt>append()</tt>.
    */
-  public Log4jAppender(){
+  public Log4jAppender() {
   }
 
   /**
@@ -133,36 +137,17 @@ public class Log4jAppender extends AppenderSkeleton {
       reconnect();
     }
 
-    //Client created first time append is called.
-    Map<String, String> hdrs = new HashMap<String, String>();
-    hdrs.put(Log4jAvroHeaders.LOGGER_NAME.toString(), event.getLoggerName());
-    hdrs.put(Log4jAvroHeaders.TIMESTAMP.toString(),
-        String.valueOf(event.timeStamp));
-    hdrs.put(Log4jAvroHeaders.ADDRESS.toString(), clientAddress);
-    //To get the level back simply use
-    //LoggerEvent.toLevel(hdrs.get(Integer.parseInt(
-    //Log4jAvroHeaders.LOG_LEVEL.toString()))
-    hdrs.put(Log4jAvroHeaders.LOG_LEVEL.toString(),
-        String.valueOf(event.getLevel().toInt()));
-
-    Event flumeEvent;
-    Object message = event.getMessage();
-    if (message instanceof GenericRecord) {
-      GenericRecord record = (GenericRecord) message;
-      populateAvroHeaders(hdrs, record.getSchema(), message);
-      flumeEvent = EventBuilder.withBody(serialize(record, record.getSchema()), hdrs);
-    } else if (message instanceof SpecificRecord || avroReflectionEnabled) {
-      Schema schema = ReflectData.get().getSchema(message.getClass());
-      populateAvroHeaders(hdrs, schema, message);
-      flumeEvent = EventBuilder.withBody(serialize(message, schema), hdrs);
-    } else {
-      hdrs.put(Log4jAvroHeaders.MESSAGE_ENCODING.toString(), "UTF8");
-      String msg = layout != null ? layout.format(event) : message.toString();
-      flumeEvent = EventBuilder.withBody(msg, Charset.forName("UTF8"), hdrs);
-    }
-
+    List<Event> flumeEvents = parseEvents(event);
     try {
-      rpcClient.append(flumeEvent);
+      switch (flumeEvents.size()) {
+        case 0:
+          break;
+        case 1:
+          rpcClient.append(flumeEvents.get(0));
+          break;
+        default:
+          rpcClient.appendBatch(flumeEvents);
+      }
     } catch (EventDeliveryException e) {
       String msg = "Flume append() failed.";
       LogLog.error(msg);
@@ -173,13 +158,69 @@ public class Log4jAppender extends AppenderSkeleton {
     }
   }
 
+  private List<Event> parseEvents(LoggingEvent loggingEvent) {
+    Map<String, String> headers = new HashMap<>();
+    headers.put(Log4jAvroHeaders.LOGGER_NAME.toString(), loggingEvent.getLoggerName());
+    headers.put(Log4jAvroHeaders.TIMESTAMP.toString(), String.valueOf(loggingEvent.timeStamp));
+    headers.put(Log4jAvroHeaders.ADDRESS.toString(), clientAddress);
+
+    //To get the level back simply use
+    //LoggerEvent.toLevel(hdrs.get(Integer.parseInt(
+    //Log4jAvroHeaders.LOG_LEVEL.toString()))
+    headers.put(Log4jAvroHeaders.LOG_LEVEL.toString(),
+        String.valueOf(loggingEvent.getLevel().toInt()));
+
+    Map<String, String> headersWithEncoding = null;
+
+    Collection<?> messages;
+    if (loggingEvent.getMessage() instanceof Collection) {
+      messages = (Collection) loggingEvent.getMessage();
+    } else {
+      messages = Collections.singleton(loggingEvent.getMessage());
+    }
+
+    List<Event> events = new LinkedList<>();
+    for (Object message : messages) {
+      if (message instanceof GenericRecord) {
+        GenericRecord record = (GenericRecord) message;
+        populateAvroHeaders(headers, record.getSchema());
+        events.add(EventBuilder.withBody(serialize(record, record.getSchema()), headers));
+
+      } else if (message instanceof SpecificRecord || avroReflectionEnabled) {
+        Schema schema = ReflectData.get().getSchema(message.getClass());
+        populateAvroHeaders(headers, schema);
+        events.add(EventBuilder.withBody(serialize(message, schema), headers));
+
+      } else {
+        String msg;
+        if (layout != null) {
+          LoggingEvent singleLoggingEvent = new LoggingEvent(loggingEvent.getFQNOfLoggerClass(),
+              loggingEvent.getLogger(), loggingEvent.getTimeStamp(), loggingEvent.getLevel(),
+              message, loggingEvent.getThreadName(), loggingEvent.getThrowableInformation(),
+              loggingEvent.getNDC(), loggingEvent.getLocationInformation(),
+              loggingEvent.getProperties());
+          msg = layout.format(singleLoggingEvent);
+        } else {
+          msg = message.toString();
+        }
+
+        if (headersWithEncoding == null) {
+          headersWithEncoding = new HashMap<>(headers);
+          headersWithEncoding.put(Log4jAvroHeaders.MESSAGE_ENCODING.toString(), "UTF8");
+        }
+        events.add(EventBuilder.withBody(msg, Charset.forName("UTF8"), headersWithEncoding));
+      }
+    }
+
+    return events;
+  }
+
   private Schema schema;
   private ByteArrayOutputStream out;
   private DatumWriter<Object> writer;
   private BinaryEncoder encoder;
 
-  protected void populateAvroHeaders(Map<String, String> hdrs, Schema schema,
-      Object message) {
+  protected void populateAvroHeaders(Map<String, String> hdrs, Schema schema) {
     if (avroSchemaUrl != null) {
       hdrs.put(Log4jAvroHeaders.AVRO_SCHEMA_URL.toString(), avroSchemaUrl);
       return;
@@ -193,7 +234,7 @@ public class Log4jAppender extends AppenderSkeleton {
     if (schema == null || !datumSchema.equals(schema)) {
       schema = datumSchema;
       out = new ByteArrayOutputStream();
-      writer = new ReflectDatumWriter<Object>(schema);
+      writer = new ReflectDatumWriter<>(schema);
       encoder = EncoderFactory.get().binaryEncoder(out, null);
     }
     out.reset();

http://git-wip-us.apache.org/repos/asf/flume/blob/66327aa2/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
index b8663a6..25698c5 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppender.java
@@ -21,11 +21,13 @@ package org.apache.flume.clients.log4jappender;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import junit.framework.Assert;
 
@@ -41,6 +43,7 @@ import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.source.AvroSource;
+import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -48,6 +51,7 @@ import org.apache.log4j.PropertyConfigurator;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class TestLog4jAppender {
   private AvroSource source;
@@ -57,7 +61,7 @@ public class TestLog4jAppender {
   @Before
   public void initiate() throws Exception {
     int port = 25430;
-    source = new AvroSource();
+    source = Mockito.spy(new AvroSource());
     ch = new MemoryChannel();
     Configurables.configure(ch, new Context());
 
@@ -76,16 +80,12 @@ public class TestLog4jAppender {
   }
 
   private void configureSource() {
-    List<Channel> channels = new ArrayList<Channel>();
-    channels.add(ch);
-
     ChannelSelector rcs = new ReplicatingChannelSelector();
-    rcs.setChannels(channels);
-
+    rcs.setChannels(Collections.singletonList(ch));
     source.setChannelProcessor(new ChannelProcessor(rcs));
-
     source.start();
   }
+
   @Test
   public void testLog4jAppender() throws IOException {
     configureSource();
@@ -125,7 +125,64 @@ public class TestLog4jAppender {
       transaction.commit();
       transaction.close();
     }
+  }
+
+  private void testBatchedSending(int numEvents) {
+    configureSource();
+    PropertyConfigurator.configure(props);
+
+    Logger logger = LogManager.getLogger(getClass());
+    List<String> events = IntStream.range(0, numEvents).mapToObj(String::valueOf)
+        .collect(Collectors.toList());
+    logger.info(events);
+
+    Transaction tx = ch.getTransaction();
+    tx.begin();
+    for (String s : events) {
+      Event e = ch.take();
+      Assert.assertNotNull(e);
+      Assert.assertEquals(s, new String(e.getBody()));
+    }
+    Assert.assertNull("There should be no more events in the channel", ch.take());
+  }
 
+  @Test
+  public void testLogBatch() {
+    testBatchedSending(5);
+    Mockito.verify(source, Mockito.times(1)).appendBatch(Mockito.anyList());
+    Mockito.verify(source, Mockito.times(0)).append(Mockito.any(AvroFlumeEvent.class));
+  }
+
+  @Test
+  public void testLogSingleMessage() {
+    configureSource();
+    PropertyConfigurator.configure(props);
+
+    Logger logger = LogManager.getLogger(getClass());
+    logger.info("test");
+
+    Transaction tx = ch.getTransaction();
+    tx.begin();
+    Event e = ch.take();
+    Assert.assertNotNull(e);
+    Assert.assertEquals("test", new String(e.getBody()));
+
+    Mockito.verify(source, Mockito.times(0)).appendBatch(Mockito.anyList());
+    Mockito.verify(source, Mockito.times(1)).append(Mockito.any(AvroFlumeEvent.class));
+  }
+
+  @Test
+  public void testLogSingleMessageInCollection() {
+    testBatchedSending(1);
+    Mockito.verify(source, Mockito.times(0)).appendBatch(Mockito.anyList());
+    Mockito.verify(source, Mockito.times(1)).append(Mockito.any(AvroFlumeEvent.class));
+  }
+
+  @Test
+  public void testLogEmptyBatch() {
+    testBatchedSending(0);
+    Mockito.verify(source, Mockito.times(0)).appendBatch(Mockito.anyList());
+    Mockito.verify(source, Mockito.times(0)).append(Mockito.any(AvroFlumeEvent.class));
   }
 
   @Test
@@ -136,7 +193,6 @@ public class TestLog4jAppender {
     Logger logger = LogManager.getLogger(TestLog4jAppender.class);
     source.stop();
     sendAndAssertFail(logger);
-
   }
 
   @Test(expected = EventDeliveryException.class)

http://git-wip-us.apache.org/repos/asf/flume/blob/66327aa2/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java
index 0607e3a..7c2a964 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java
@@ -25,6 +25,7 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -75,7 +76,7 @@ public class TestLog4jAppenderWithAvro {
     context.put("bind", "localhost");
     Configurables.configure(source, context);
 
-    List<Channel> channels = new ArrayList<Channel>();
+    List<Channel> channels = new ArrayList<>();
     channels.add(ch);
 
     ChannelSelector rcs = new ReplicatingChannelSelector();
@@ -87,8 +88,7 @@ public class TestLog4jAppenderWithAvro {
   }
 
   private void loadProperties(String file) throws IOException {
-    File TESTFILE = new File(
-        TestLog4jAppenderWithAvro.class.getClassLoader()
+    File TESTFILE = new File(TestLog4jAppenderWithAvro.class.getClassLoader()
             .getResource(file).getFile());
     FileReader reader = new FileReader(TESTFILE);
     props = new Properties();
@@ -115,7 +115,7 @@ public class TestLog4jAppenderWithAvro {
     Event event = ch.take();
     Assert.assertNotNull(event);
 
-    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
+    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
     BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(event.getBody(), null);
     GenericRecord recordFromEvent = reader.read(null, decoder);
     Assert.assertEquals(msg, recordFromEvent.get("message").toString());
@@ -131,7 +131,6 @@ public class TestLog4jAppenderWithAvro {
 
     transaction.commit();
     transaction.close();
-
   }
 
   @Test
@@ -153,7 +152,7 @@ public class TestLog4jAppenderWithAvro {
 
     Schema schema = ReflectData.get().getSchema(appEvent.getClass());
 
-    ReflectDatumReader<AppEvent> reader = new ReflectDatumReader<AppEvent>(AppEvent.class);
+    ReflectDatumReader<AppEvent> reader = new ReflectDatumReader<>(AppEvent.class);
     BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(event.getBody(), null);
     AppEvent recordFromEvent = reader.read(null, decoder);
     Assert.assertEquals(msg, recordFromEvent.getMessage());
@@ -169,7 +168,77 @@ public class TestLog4jAppenderWithAvro {
 
     transaction.commit();
     transaction.close();
+  }
+
+  @Test
+  public void testDifferentEventTypesInBatchWithAvroReflect() throws IOException {
+    loadProperties("flume-log4jtest-avro-reflect.properties");
+    PropertyConfigurator.configure(props);
+    Logger logger = LogManager.getLogger(getClass());
+    List<Object> events = Arrays.asList("string", new AppEvent("appEvent"));
+    logger.info(events);
+
+    Transaction transaction = ch.getTransaction();
+    transaction.begin();
 
+    for (Object o : events) {
+      Event e = ch.take();
+      Assert.assertNotNull(e);
+      ReflectDatumReader<?> reader = new ReflectDatumReader<>(o.getClass());
+      BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(e.getBody(), null);
+      Object readObject = reader.read(null, decoder);
+      Assert.assertEquals(o, readObject);
+
+      Map<String, String> hdrs = e.getHeaders();
+      Assert.assertNull(hdrs.get(Log4jAvroHeaders.MESSAGE_ENCODING.toString()));
+      Assert.assertNull("Schema URL should not be set",
+          hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_URL.toString()));
+      Assert.assertEquals("Schema string should be set",
+          ReflectData.get().getSchema(readObject.getClass()).toString(),
+          hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_LITERAL.toString()));
+    }
+    Assert.assertNull("There should be no more events in the channel", ch.take());
+  }
+
+  @Test
+  public void testDifferentEventTypesInBatchWithAvroGeneric() throws IOException {
+    loadProperties("flume-log4jtest-avro-generic.properties");
+    PropertyConfigurator.configure(props);
+    Logger logger = LogManager.getLogger(getClass());
+    String msg = "Avro log message";
+
+    Schema schema = new Schema.Parser().parse(
+        getClass().getClassLoader().getResource("myrecord.avsc").openStream());
+    GenericRecordBuilder builder = new GenericRecordBuilder(schema);
+    GenericRecord record = builder.set("message", msg).build();
+
+    List<Object> events = Arrays.asList("string", record);
+    logger.info(events);
+
+    Transaction transaction = ch.getTransaction();
+    transaction.begin();
+
+    Event event = ch.take();
+    Assert.assertNotNull(event);
+    Assert.assertEquals("string", new String(event.getBody()));
+
+    event = ch.take();
+    Assert.assertNotNull(event);
+    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
+    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(event.getBody(), null);
+    GenericRecord recordFromEvent = reader.read(null, decoder);
+    Assert.assertEquals(msg, recordFromEvent.get("message").toString());
+
+    Map<String, String> hdrs = event.getHeaders();
+    Assert.assertNull(hdrs.get(Log4jAvroHeaders.MESSAGE_ENCODING.toString()));
+
+    Assert.assertEquals("Schema URL should be set",
+        "file:///tmp/myrecord.avsc", hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_URL.toString()));
+    Assert.assertNull("Schema string should not be set",
+        hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_LITERAL.toString()));
+
+    transaction.commit();
+    transaction.close();
   }
 
   @After
@@ -182,6 +251,13 @@ public class TestLog4jAppenderWithAvro {
   public static class AppEvent {
     private String message;
 
+    public AppEvent() {
+    }
+
+    public AppEvent(String message) {
+      this.message = message;
+    }
+
     public String getMessage() {
       return message;
     }
@@ -189,6 +265,20 @@ public class TestLog4jAppenderWithAvro {
     public void setMessage(String message) {
       this.message = message;
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      AppEvent appEvent = (AppEvent) o;
+      return message != null ? message.equals(appEvent.message) : appEvent.message == null;
+    }
+
+    @Override
+    public int hashCode() {
+      return message != null ? message.hashCode() : 0;
+    }
   }
 
 }