You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rg...@apache.org on 2015/06/15 03:17:37 UTC

logging-log4j2 git commit: LOG4J2-1044 - Support batchSize in FlumeAvroManager.

Repository: logging-log4j2
Updated Branches:
  refs/heads/master 4c6d636b6 -> 3780d00c0


LOG4J2-1044 - Support batchSize in FlumeAvroManager.


Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/3780d00c
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/3780d00c
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/3780d00c

Branch: refs/heads/master
Commit: 3780d00c03770e4357d9def9a0460d6b665c90e3
Parents: 4c6d636
Author: Ralph Goers <rg...@nextiva.com>
Authored: Sun Jun 14 18:17:10 2015 -0700
Committer: Ralph Goers <rg...@nextiva.com>
Committed: Sun Jun 14 18:17:10 2015 -0700

----------------------------------------------------------------------
 .../log4j/flume/appender/FlumeAppender.java     |  4 +-
 .../log4j/flume/appender/FlumeAvroManager.java  | 71 ++++++++++++++------
 .../flume/appender/FlumePersistentManager.java  | 15 ++---
 .../log4j/flume/appender/FlumeAppenderTest.java | 41 +++++++++++
 src/changes/changes.xml                         |  3 +
 5 files changed, 101 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3780d00c/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
index c37ea1d..1c466ef 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAppender.java
@@ -247,7 +247,7 @@ public final class FlumeAppender extends AbstractAppender implements FlumeEventF
                     LOGGER.debug("No agents provided, using defaults");
                     agents = new Agent[] {Agent.createAgent(null, null)};
                 }
-                manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeoutMillis, reqTimeoutMillis);
+                manager = FlumeAvroManager.getManager(name, agents, batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis);
                 break;
             case PERSISTENT:
                 if (agents == null || agents.length == 0) {
@@ -263,7 +263,7 @@ public final class FlumeAppender extends AbstractAppender implements FlumeEventF
                     LOGGER.debug("No agents provided, using defaults");
                     agents = new Agent[] {Agent.createAgent(null, null)};
                 }
-                manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeoutMillis, reqTimeoutMillis);
+                manager = FlumeAvroManager.getManager(name, agents, batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis);
         }
 
         if (manager == null) {

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3780d00c/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
index a4d52b6..4f5bd1a 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
@@ -38,6 +38,9 @@ public class FlumeAvroManager extends AbstractFlumeManager {
 
     private final int batchSize;
 
+    private final long delayNanos;
+    private final int delayMillis;
+
     private final int retries;
 
     private final int connectTimeoutMillis;
@@ -48,6 +51,9 @@ public class FlumeAvroManager extends AbstractFlumeManager {
 
     private RpcClient rpcClient = null;
 
+    private BatchEvent batchEvent = new BatchEvent();
+    private long nextSend = 0;
+
     /**
      * Constructor
      * @param name The unique name of this manager.
@@ -59,10 +65,12 @@ public class FlumeAvroManager extends AbstractFlumeManager {
      *
      */
     protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
-                               final int retries, final int connectTimeout, final int requestTimeout) {
+                               final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
         super(name);
         this.agents = agents;
         this.batchSize = batchSize;
+        this.delayMillis = delayMillis;
+        this.delayNanos = delayMillis * 1000000;
         this.retries = retries;
         this.connectTimeoutMillis = connectTimeout;
         this.requestTimeoutMillis = requestTimeout;
@@ -74,12 +82,13 @@ public class FlumeAvroManager extends AbstractFlumeManager {
      * @param name The name of the manager.
      * @param agents The agents to use.
      * @param batchSize The number of events to include in a batch.
+     * @param delayMillis The number of milliseconds to wait before sending an incomplete batch.
      * @param retries The number of times to retry connecting before giving up.
      * @param connectTimeoutMillis The connection timeout in ms.
      * @param requestTimeoutMillis The request timeout in ms.
      * @return A FlumeAvroManager.
      */
-    public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize,
+    public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, int delayMillis,
                                               final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
         if (agents == null || agents.length == 0) {
             throw new IllegalArgumentException("At least one agent is required");
@@ -100,7 +109,7 @@ public class FlumeAvroManager extends AbstractFlumeManager {
         }
         sb.append(']');
         return getManager(sb.toString(), factory,
-                new FactoryData(name, agents, batchSize, retries, connectTimeoutMillis, requestTimeoutMillis));
+                new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
     }
 
     /**
@@ -135,6 +144,10 @@ public class FlumeAvroManager extends AbstractFlumeManager {
         return batchSize;
     }
 
+    public int getDelayMillis() {
+        return delayMillis;
+    }
+
     public synchronized void send(final BatchEvent events) {
         if (rpcClient == null) {
             rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
@@ -162,26 +175,38 @@ public class FlumeAvroManager extends AbstractFlumeManager {
 
     @Override
     public synchronized void send(final Event event)  {
-        if (rpcClient == null) {
-            rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
-        }
+        if (batchSize == 1) {
+            if (rpcClient == null) {
+                rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
+            }
 
-        if (rpcClient != null) {
-            try {
-                rpcClient.append(event);
-            } catch (final Exception ex) {
-                rpcClient.close();
-                rpcClient = null;
+            if (rpcClient != null) {
+                try {
+                    rpcClient.append(event);
+                } catch (final Exception ex) {
+                    rpcClient.close();
+                    rpcClient = null;
+                    final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+                            agents[current].getPort();
+                    LOGGER.warn(msg, ex);
+                    throw new AppenderLoggingException("No Flume agents are available");
+                }
+            } else {
                 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
-                    agents[current].getPort();
-                LOGGER.warn(msg, ex);
+                        agents[current].getPort();
+                LOGGER.warn(msg);
                 throw new AppenderLoggingException("No Flume agents are available");
             }
         } else {
-            final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
-                agents[current].getPort();
-            LOGGER.warn(msg);
-            throw new AppenderLoggingException("No Flume agents are available");
+            batchEvent.addEvent(event);
+            int count = batchEvent.getEvents().size();
+            if (count == 1) {
+                nextSend = System.nanoTime() + delayNanos;
+            }
+            if (count >= batchSize || System.nanoTime() >= nextSend) {
+                send(batchEvent);
+                batchEvent = new BatchEvent();
+            }
         }
     }
 
@@ -248,6 +273,7 @@ public class FlumeAvroManager extends AbstractFlumeManager {
         private final String name;
         private final Agent[] agents;
         private final int batchSize;
+        private final int delayMillis;
         private final int retries;
         private final int conntectTimeoutMillis;
         private final int requestTimeoutMillis;
@@ -258,11 +284,12 @@ public class FlumeAvroManager extends AbstractFlumeManager {
          * @param agents The agents.
          * @param batchSize The number of events to include in a batch.
          */
-        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
-                           final int connectTimeoutMillis, final int requestTimeoutMillis) {
+        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
+                final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
             this.name = name;
             this.agents = agents;
             this.batchSize = batchSize;
+            this.delayMillis = delayMillis;
             this.retries = retries;
             this.conntectTimeoutMillis = connectTimeoutMillis;
             this.requestTimeoutMillis = requestTimeoutMillis;
@@ -284,8 +311,8 @@ public class FlumeAvroManager extends AbstractFlumeManager {
         public FlumeAvroManager createManager(final String name, final FactoryData data) {
             try {
 
-                return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.retries,
-                    data.conntectTimeoutMillis, data.requestTimeoutMillis);
+                return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
+                        data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
             } catch (final Exception ex) {
                 LOGGER.error("Could not create FlumeAvroManager", ex);
             }

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3780d00c/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
index e643d31..c0f8879 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
@@ -90,8 +90,6 @@ public class FlumePersistentManager extends FlumeAvroManager {
 
     private final SecretKey secretKey;
 
-    private final int delayMillis;
-
     private final int lockTimeoutRetryCount;
 
     private final ExecutorService threadPool;
@@ -118,8 +116,7 @@ public class FlumePersistentManager extends FlumeAvroManager {
                                      final int requestTimeout, final int delay, final Database database,
                                      final Environment environment, final SecretKey secretKey,
                                      final int lockTimeoutRetryCount) {
-        super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
-        this.delayMillis = delay;
+        super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
         this.database = database;
         this.environment = environment;
         dbCount.set(database.count());
@@ -515,14 +512,14 @@ public class FlumePersistentManager extends FlumeAvroManager {
 
         @Override
         public void run() {
-            LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.delayMillis);
-            long nextBatchMillis = System.currentTimeMillis() + manager.delayMillis;
+            LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
+            long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
             while (!shutdown) {
                 final long nowMillis = System.currentTimeMillis();
                 final long dbCount = database.count();
                 dbCounter.set(dbCount);
                 if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
-                    nextBatchMillis = nowMillis + manager.delayMillis;
+                    nextBatchMillis = nowMillis + manager.getDelayMillis();
                     try {
                         boolean errors = false;
                         final DatabaseEntry key = new DatabaseEntry();
@@ -621,7 +618,7 @@ public class FlumePersistentManager extends FlumeAvroManager {
                             }
                         }
                         if (errors) {
-                            Thread.sleep(manager.delayMillis);
+                            Thread.sleep(manager.getDelayMillis());
                             continue;
                         }
                     } catch (final Exception ex) {
@@ -629,7 +626,7 @@ public class FlumePersistentManager extends FlumeAvroManager {
                     }
                 } else {
                     if (nextBatchMillis <= nowMillis) {
-                        nextBatchMillis = nowMillis + manager.delayMillis;
+                        nextBatchMillis = nowMillis + manager.getDelayMillis();
                     }
                     try {
                         final long interval = nextBatchMillis - nowMillis;

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3780d00c/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
index 7c2e9b9..2715abe 100644
--- a/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
+++ b/log4j-flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
@@ -217,6 +217,47 @@ public class FlumeAppenderTest {
         eventSource.stop();
     }
 
+
+    @Test
+    public void testIncompleteBatch() throws IOException {
+        final Agent[] agents = new Agent[] { Agent.createAgent("localhost",
+                testPort) };
+        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents,
+                null, "false", "Avro", null, "1000", "1000", "1", "500",
+                "avro", "false", null, null, null, null, null, "true", "10",
+                null, null, null, null);
+        avroAppender.start();
+        avroLogger.addAppender(avroAppender);
+        avroLogger.setLevel(Level.ALL);
+
+        Assert.assertNotNull(avroLogger);
+
+        avroLogger.info("Test message 0");
+
+        final Transaction transaction = channel.getTransaction();
+        transaction.begin();
+
+        Event event = channel.take();
+        Assert.assertNull("Received event", event);
+
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException ie) {
+        }
+
+        avroLogger.info("Test message 1");
+        for (int i = 0; i < 2; ++i) {
+            event = channel.take();
+            Assert.assertNotNull("No event for item " + i, event);
+            Assert.assertTrue("Channel contained event, but not expected message",
+                    getBody(event).endsWith("Test message " + i));
+        }
+        transaction.commit();
+        transaction.close();
+
+        eventSource.stop();
+    }
+
     @Test
     public void testBatch() throws IOException {
         final Agent[] agents = new Agent[] { Agent.createAgent("localhost",

http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/3780d00c/src/changes/changes.xml
----------------------------------------------------------------------
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 849457b..61cd933 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -24,6 +24,9 @@
   </properties>
   <body>
     <release version="2.4" date="2015-MM-DD" description="GA Release 2.4">
+      <action issue="LOG4J2=1044" dev="rgoers" type="update">
+          Support batchSize in FlumeAvroManager.
+      </action>
       <action issue="LOG4J2-767" dev="ggregory" type="add" due-to="Mikael Ståldal">
         New module for Liquibase integration.
       </action>