You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by gg...@apache.org on 2015/09/14 23:26:04 UTC

[1/2] logging-log4j2 git commit: Checkstyle: do not hide field.

Repository: logging-log4j2
Updated Branches:
  refs/heads/master a610ba84f -> a7f67f8bb


Checkstyle: do not hide field.

Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/5f8aaddc
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/5f8aaddc
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/5f8aaddc

Branch: refs/heads/master
Commit: 5f8aaddc9613bf0a425060238dfd20e61c9ebe34
Parents: a610ba8
Author: ggregory <gg...@apache.org>
Authored: Mon Sep 14 14:07:00 2015 -0700
Committer: ggregory <gg...@apache.org>
Committed: Mon Sep 14 14:07:00 2015 -0700

----------------------------------------------------------------------
 .../flume/appender/FlumePersistentManager.java  | 1754 +++++++++---------
 1 file changed, 877 insertions(+), 877 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/5f8aaddc/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
index c0f8879..3383d79 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
@@ -1,877 +1,877 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-package org.apache.logging.log4j.flume.appender;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.crypto.Cipher;
-import javax.crypto.SecretKey;
-
-import org.apache.flume.Event;
-import org.apache.flume.event.SimpleEvent;
-import org.apache.logging.log4j.LoggingException;
-import org.apache.logging.log4j.core.appender.ManagerFactory;
-import org.apache.logging.log4j.core.config.Property;
-import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
-import org.apache.logging.log4j.core.config.plugins.util.PluginType;
-import org.apache.logging.log4j.core.util.FileUtils;
-import org.apache.logging.log4j.core.util.SecretKeyProvider;
-import org.apache.logging.log4j.util.Strings;
-
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.CursorConfig;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.LockConflictException;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.StatsConfig;
-import com.sleepycat.je.Transaction;
-
-/**
- * Manager that persists data to Berkeley DB before passing it on to Flume.
- */
-public class FlumePersistentManager extends FlumeAvroManager {
-
-    /** Attribute name for the key provider. */
-    public static final String KEY_PROVIDER = "keyProvider";
-
-    private static final Charset UTF8 = Charset.forName("UTF-8");
-
-    private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
-
-    private static final int SHUTDOWN_WAIT = 60;
-
-    private static final int MILLIS_PER_SECOND = 1000;
-
-    private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
-
-    private static BDBManagerFactory factory = new BDBManagerFactory();
-
-    private final Database database;
-
-    private final Environment environment;
-
-    private final WriterThread worker;
-
-    private final Gate gate = new Gate();
-
-    private final SecretKey secretKey;
-
-    private final int lockTimeoutRetryCount;
-
-    private final ExecutorService threadPool;
-
-    private final AtomicLong dbCount = new AtomicLong();
-
-    /**
-     * Constructor
-     * @param name The unique name of this manager.
-     * @param shortName Original name for the Manager.
-     * @param agents An array of Agents.
-     * @param batchSize The number of events to include in a batch.
-     * @param retries The number of times to retry connecting before giving up.
-     * @param connectionTimeout The amount of time to wait for a connection to be established.
-     * @param requestTimeout The amount of time to wair for a response to a request.
-     * @param delay The amount of time to wait between retries.
-     * @param database The database to write to.
-     * @param environment The database environment.
-     * @param secretKey The SecretKey to use for encryption.
-     * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
-     */
-    protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
-                                     final int batchSize, final int retries, final int connectionTimeout,
-                                     final int requestTimeout, final int delay, final Database database,
-                                     final Environment environment, final SecretKey secretKey,
-                                     final int lockTimeoutRetryCount) {
-        super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
-        this.database = database;
-        this.environment = environment;
-        dbCount.set(database.count());
-        this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
-            lockTimeoutRetryCount);
-        this.worker.start();
-        this.secretKey = secretKey;
-        this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
-        this.lockTimeoutRetryCount = lockTimeoutRetryCount;
-    }
-
-
-    /**
-     * Returns a FlumeAvroManager.
-     * @param name The name of the manager.
-     * @param agents The agents to use.
-     * @param properties Properties to pass to the Manager.
-     * @param batchSize The number of events to include in a batch.
-     * @param retries The number of times to retry connecting before giving up.
-     * @param connectionTimeout The amount of time to wait to establish a connection.
-     * @param requestTimeout The amount of time to wait for a response to a request.
-     * @param delayMillis Amount of time to delay before delivering a batch.
-     * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
-     * @param dataDir The location of the Berkeley database.
-     * @return A FlumeAvroManager.
-     */
-    public static FlumePersistentManager getManager(final String name, final Agent[] agents,
-                                                    final Property[] properties, int batchSize, final int retries,
-                                                    final int connectionTimeout, final int requestTimeout,
-                                                    final int delayMillis, final int lockTimeoutRetryCount,
-                                                    final String dataDir) {
-        if (agents == null || agents.length == 0) {
-            throw new IllegalArgumentException("At least one agent is required");
-        }
-
-        if (batchSize <= 0) {
-            batchSize = 1;
-        }
-        final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
-
-        final StringBuilder sb = new StringBuilder("FlumePersistent[");
-        boolean first = true;
-        for (final Agent agent : agents) {
-            if (!first) {
-                sb.append(',');
-            }
-            sb.append(agent.getHost()).append(':').append(agent.getPort());
-            first = false;
-        }
-        sb.append(']');
-        sb.append(' ').append(dataDirectory);
-        return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
-            connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
-    }
-
-    @Override
-    public void send(final Event event)  {
-        if (worker.isShutdown()) {
-            throw new LoggingException("Unable to record event");
-        }
-
-        final Map<String, String> headers = event.getHeaders();
-        final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
-        try {
-            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            final DataOutputStream daos = new DataOutputStream(baos);
-            daos.writeInt(event.getBody().length);
-            daos.write(event.getBody(), 0, event.getBody().length);
-            daos.writeInt(event.getHeaders().size());
-            for (final Map.Entry<String, String> entry : headers.entrySet()) {
-                daos.writeUTF(entry.getKey());
-                daos.writeUTF(entry.getValue());
-            }
-            byte[] eventData = baos.toByteArray();
-            if (secretKey != null) {
-                final Cipher cipher = Cipher.getInstance("AES");
-                cipher.init(Cipher.ENCRYPT_MODE, secretKey);
-                eventData = cipher.doFinal(eventData);
-            }
-            final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
-                gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
-            boolean interrupted = false;
-            int count = 0;
-            do {
-                try {
-                    future.get();
-                } catch (final InterruptedException ie) {
-                    interrupted = true;
-                    ++count;
-                }
-            } while (interrupted && count <= 1);
-
-        } catch (final Exception ex) {
-            throw new LoggingException("Exception occurred writing log event", ex);
-        }
-    }
-
-    @Override
-    protected void releaseSub() {
-        LOGGER.debug("Shutting down FlumePersistentManager");
-        worker.shutdown();
-        try {
-            worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
-        } catch (final InterruptedException ie) {
-            // Ignore the exception and shutdown.
-        }
-        threadPool.shutdown();
-        try {
-            threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
-        } catch (final InterruptedException ie) {
-            LOGGER.warn("PersistentManager Thread pool failed to shut down");
-        }
-        try {
-            worker.join();
-        } catch (final InterruptedException ex) {
-            LOGGER.debug("Interrupted while waiting for worker to complete");
-        }
-        try {
-            LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
-            database.close();
-        } catch (final Exception ex) {
-            LOGGER.warn("Failed to close database", ex);
-        }
-        try {
-            environment.cleanLog();
-            environment.close();
-        } catch (final Exception ex) {
-            LOGGER.warn("Failed to close environment", ex);
-        }
-        super.releaseSub();
-    }
-
-    private void doSend(final SimpleEvent event) {
-        LOGGER.debug("Sending event to Flume");
-        super.send(event);
-    }
-
-    /**
-     * Thread for writing to Berkeley DB to avoid having interrupts close the database.
-     */
-    private static class BDBWriter implements Callable<Integer> {
-        private final byte[] eventData;
-        private final byte[] keyData;
-        private final Environment environment;
-        private final Database database;
-        private final Gate gate;
-        private final AtomicLong dbCount;
-        private final long batchSize;
-        private final int lockTimeoutRetryCount;
-
-        public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
-                         final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
-                         final int lockTimeoutRetryCount) {
-            this.keyData = keyData;
-            this.eventData = eventData;
-            this.environment = environment;
-            this.database = database;
-            this.gate = gate;
-            this.dbCount = dbCount;
-            this.batchSize = batchSize;
-            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
-        }
-
-        @Override
-        public Integer call() throws Exception {
-            final DatabaseEntry key = new DatabaseEntry(keyData);
-            final DatabaseEntry data = new DatabaseEntry(eventData);
-            Exception exception = null;
-            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
-                Transaction txn = null;
-                try {
-                    txn = environment.beginTransaction(null, null);
-                    try {
-                        database.put(txn, key, data);
-                        txn.commit();
-                        txn = null;
-                        if (dbCount.incrementAndGet() >= batchSize) {
-                            gate.open();
-                        }
-                        exception = null;
-                        break;
-                    } catch (final LockConflictException lce) {
-                        exception = lce;
-                        // Fall through and retry.
-                    } catch (final Exception ex) {
-                        if (txn != null) {
-                            txn.abort();
-                        }
-                        throw ex;
-                    } finally {
-                        if (txn != null) {
-                            txn.abort();
-                            txn = null;
-                        }
-                    }
-                } catch (final LockConflictException lce) {
-                    exception = lce;
-                    if (txn != null) {
-                        try {
-                            txn.abort();
-                            txn = null;
-                        } catch (final Exception ex) {
-                            LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
-                        }
-                    }
-
-                }
-                try {
-                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
-                } catch (final InterruptedException ie) {
-                    // Ignore the error
-                }
-            }
-            if (exception != null) {
-                throw exception;
-            }
-            return eventData.length;
-        }
-    }
-
-    /**
-     * Factory data.
-     */
-    private static class FactoryData {
-        private final String name;
-        private final Agent[] agents;
-        private final int batchSize;
-        private final String dataDir;
-        private final int retries;
-        private final int connectionTimeout;
-        private final int requestTimeout;
-        private final int delayMillis;
-        private final int lockTimeoutRetryCount;
-        private final Property[] properties;
-
-        /**
-         * Constructor.
-         * @param name The name of the Appender.
-         * @param agents The agents.
-         * @param batchSize The number of events to include in a batch.
-         * @param dataDir The directory for data.
-         */
-        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
-                           final int connectionTimeout, final int requestTimeout, final int delayMillis,
-                           final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
-            this.name = name;
-            this.agents = agents;
-            this.batchSize = batchSize;
-            this.dataDir = dataDir;
-            this.retries = retries;
-            this.connectionTimeout = connectionTimeout;
-            this.requestTimeout = requestTimeout;
-            this.delayMillis = delayMillis;
-            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
-            this.properties = properties;
-        }
-    }
-
-    /**
-     * Avro Manager Factory.
-     */
-    private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
-
-        /**
-         * Create the FlumeKratiManager.
-         * @param name The name of the entity to manage.
-         * @param data The data required to create the entity.
-         * @return The FlumeKratiManager.
-         */
-        @Override
-        public FlumePersistentManager createManager(final String name, final FactoryData data) {
-            SecretKey secretKey = null;
-            Database database = null;
-            Environment environment = null;
-
-            final Map<String, String> properties = new HashMap<>();
-            if (data.properties != null) {
-                for (final Property property : data.properties) {
-                    properties.put(property.getName(), property.getValue());
-                }
-            }
-
-            try {
-                final File dir = new File(data.dataDir);
-                FileUtils.mkdir(dir, true);
-                final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
-                dbEnvConfig.setTransactional(true);
-                dbEnvConfig.setAllowCreate(true);
-                dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
-                environment = new Environment(dir, dbEnvConfig);
-                final DatabaseConfig dbConfig = new DatabaseConfig();
-                dbConfig.setTransactional(true);
-                dbConfig.setAllowCreate(true);
-                database = environment.openDatabase(null, name, dbConfig);
-            } catch (final Exception ex) {
-                LOGGER.error("Could not create FlumePersistentManager", ex);
-                // For consistency, close database as well as environment even though it should never happen since the
-                // database is that last thing in the block above, but this does guard against a future line being
-                // inserted at the end that would bomb (like some debug logging).
-                if (database != null) {
-                    database.close();
-                    database = null;
-                }
-                if (environment != null) {
-                    environment.close();
-                    environment = null;
-                }
-                return null;
-            }
-
-            try {
-                String key = null;
-                for (final Map.Entry<String, String> entry : properties.entrySet()) {
-                    if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
-                        key = entry.getValue();
-                        break;
-                    }
-                }
-                if (key != null) {
-                    final PluginManager manager = new PluginManager("KeyProvider");
-                    manager.collectPlugins();
-                    final Map<String, PluginType<?>> plugins = manager.getPlugins();
-                    if (plugins != null) {
-                        boolean found = false;
-                        for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
-                            if (entry.getKey().equalsIgnoreCase(key)) {
-                                found = true;
-                                final Class<?> cl = entry.getValue().getPluginClass();
-                                try {
-                                    final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
-                                    secretKey = provider.getSecretKey();
-                                    LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
-                                } catch (final Exception ex) {
-                                    LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
-                                        cl.getName());
-                                }
-                                break;
-                            }
-                        }
-                        if (!found) {
-                            LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
-                        }
-                    } else {
-                        LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
-                    }
-                }
-            } catch (final Exception ex) {
-                LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
-            }
-            return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
-                data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
-                data.lockTimeoutRetryCount);
-        }
-    }
-
-    /**
-     * Thread that sends data to Flume and pulls it from Berkeley DB.
-     */
-    private static class WriterThread extends Thread  {
-        private volatile boolean shutdown = false;
-        private final Database database;
-        private final Environment environment;
-        private final FlumePersistentManager manager;
-        private final Gate gate;
-        private final SecretKey secretKey;
-        private final int batchSize;
-        private final AtomicLong dbCounter;
-        private final int lockTimeoutRetryCount;
-
-        public WriterThread(final Database database, final Environment environment,
-                            final FlumePersistentManager manager, final Gate gate, final int batchsize,
-                            final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
-            this.database = database;
-            this.environment = environment;
-            this.manager = manager;
-            this.gate = gate;
-            this.batchSize = batchsize;
-            this.secretKey = secretKey;
-            this.setDaemon(true);
-            this.dbCounter = dbCount;
-            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
-        }
-
-        public void shutdown() {
-            LOGGER.debug("Writer thread shutting down");
-            this.shutdown = true;
-            gate.open();
-        }
-
-        public boolean isShutdown() {
-            return shutdown;
-        }
-
-        @Override
-        public void run() {
-            LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
-            long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
-            while (!shutdown) {
-                final long nowMillis = System.currentTimeMillis();
-                final long dbCount = database.count();
-                dbCounter.set(dbCount);
-                if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
-                    nextBatchMillis = nowMillis + manager.getDelayMillis();
-                    try {
-                        boolean errors = false;
-                        final DatabaseEntry key = new DatabaseEntry();
-                        final DatabaseEntry data = new DatabaseEntry();
-
-                        gate.close();
-                        OperationStatus status;
-                        if (batchSize > 1) {
-                            try {
-                                errors = sendBatch(key, data);
-                            } catch (final Exception ex) {
-                                break;
-                            }
-                        } else {
-                            Exception exception = null;
-                            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
-                                exception = null;
-                                Transaction txn = null;
-                                Cursor cursor = null;
-                                try {
-                                    txn = environment.beginTransaction(null, null);
-                                    cursor = database.openCursor(txn, null);
-                                    try {
-                                        status = cursor.getFirst(key, data, LockMode.RMW);
-                                        while (status == OperationStatus.SUCCESS) {
-                                            final SimpleEvent event = createEvent(data);
-                                            if (event != null) {
-                                                try {
-                                                    manager.doSend(event);
-                                                } catch (final Exception ioe) {
-                                                    errors = true;
-                                                    LOGGER.error("Error sending event", ioe);
-                                                    break;
-                                                }
-                                                try {
-                                                    cursor.delete();
-                                                } catch (final Exception ex) {
-                                                    LOGGER.error("Unable to delete event", ex);
-                                                }
-                                            }
-                                            status = cursor.getNext(key, data, LockMode.RMW);
-                                        }
-                                        if (cursor != null) {
-                                            cursor.close();
-                                            cursor = null;
-                                        }
-                                        txn.commit();
-                                        txn = null;
-                                        dbCounter.decrementAndGet();
-                                        exception = null;
-                                        break;
-                                    } catch (final LockConflictException lce) {
-                                        exception = lce;
-                                        // Fall through and retry.
-                                    } catch (final Exception ex) {
-                                        LOGGER.error("Error reading or writing to database", ex);
-                                        shutdown = true;
-                                        break;
-                                    } finally {
-                                        if (cursor != null) {
-                                            cursor.close();
-                                            cursor = null;
-                                        }
-                                        if (txn != null) {
-                                            txn.abort();
-                                            txn = null;
-                                        }
-                                    }
-                                } catch (final LockConflictException lce) {
-                                    exception = lce;
-                                    if (cursor != null) {
-                                        try {
-                                            cursor.close();
-                                            cursor = null;
-                                        } catch (final Exception ex) {
-                                            LOGGER.trace("Ignored exception closing cursor during lock conflict.");
-                                        }
-                                    }
-                                    if (txn != null) {
-                                        try {
-                                            txn.abort();
-                                            txn = null;
-                                        } catch (final Exception ex) {
-                                            LOGGER.trace("Ignored exception aborting tx during lock conflict.");
-                                        }
-                                    }
-                                }
-                                try {
-                                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
-                                } catch (final InterruptedException ie) {
-                                    // Ignore the error
-                                }
-                            }
-                            if (exception != null) {
-                                LOGGER.error("Unable to read or update data base", exception);
-                            }
-                        }
-                        if (errors) {
-                            Thread.sleep(manager.getDelayMillis());
-                            continue;
-                        }
-                    } catch (final Exception ex) {
-                        LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
-                    }
-                } else {
-                    if (nextBatchMillis <= nowMillis) {
-                        nextBatchMillis = nowMillis + manager.getDelayMillis();
-                    }
-                    try {
-                        final long interval = nextBatchMillis - nowMillis;
-                        gate.waitForOpen(interval);
-                    } catch (final InterruptedException ie) {
-                        LOGGER.warn("WriterThread interrupted, continuing");
-                    } catch (final Exception ex) {
-                        LOGGER.error("WriterThread encountered an exception waiting for work", ex);
-                        break;
-                    }
-                }
-            }
-
-            if (batchSize > 1 && database.count() > 0) {
-                final DatabaseEntry key = new DatabaseEntry();
-                final DatabaseEntry data = new DatabaseEntry();
-                try {
-                    sendBatch(key, data);
-                } catch (final Exception ex) {
-                    LOGGER.warn("Unable to write final batch");
-                }
-            }
-            LOGGER.trace("WriterThread exiting");
-        }
-
-        private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
-            boolean errors = false;
-            OperationStatus status;
-            Cursor cursor = null;
-            try {
-            	final BatchEvent batch = new BatchEvent();
-            	for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
-            		try {
-            			cursor = database.openCursor(null, CursorConfig.DEFAULT);
-            			status = cursor.getFirst(key, data, null);
-
-            			for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
-            				final SimpleEvent event = createEvent(data);
-            				if (event != null) {
-            					batch.addEvent(event);
-            				}
-            				status = cursor.getNext(key, data, null);
-            			}
-            			break;
-            		} catch (final LockConflictException lce) {
-            			if (cursor != null) {
-            				try {
-                                cursor.close();
-                                cursor = null;
-                            } catch (final Exception ex) {
-                                LOGGER.trace("Ignored exception closing cursor during lock conflict.");
-                            }
-                        }
-                    }
-            	}
-
-                try {
-                    manager.send(batch);
-                } catch (final Exception ioe) {
-                    LOGGER.error("Error sending events", ioe);
-                    errors = true;
-                }
-                if (!errors) {
-                	if (cursor != null) {
-	                    cursor.close();
-	                    cursor = null;
-                	}
-                    Transaction txn = null;
-                    Exception exception = null;
-                    for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
-                        try {
-                            txn = environment.beginTransaction(null, null);
-                            try {
-                                for (final Event event : batch.getEvents()) {
-                                    try {
-                                        final Map<String, String> headers = event.getHeaders();
-                                        key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
-                                        database.delete(txn, key);
-                                    } catch (final Exception ex) {
-                                        LOGGER.error("Error deleting key from database", ex);
-                                    }
-                                }
-                                txn.commit();
-                                long count = dbCounter.get();
-                                while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
-                                    count = dbCounter.get();
-                                }
-                                exception = null;
-                                break;
-                            } catch (final LockConflictException lce) {
-                                exception = lce;
-                                if (cursor != null) {
-                                    try {
-                                        cursor.close();
-                                        cursor = null;
-                                    } catch (final Exception ex) {
-                                        LOGGER.trace("Ignored exception closing cursor during lock conflict.");
-                                    }
-                                }
-                                if (txn != null) {
-                                    try {
-                                        txn.abort();
-                                        txn = null;
-                                    } catch (final Exception ex) {
-                                        LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
-                                    }
-                                }
-                            } catch (final Exception ex) {
-                                LOGGER.error("Unable to commit transaction", ex);
-                                if (txn != null) {
-                                    txn.abort();
-                                }
-                            }
-                        } catch (final LockConflictException lce) {
-                            exception = lce;
-                            if (cursor != null) {
-                                try {
-                                    cursor.close();
-                                    cursor = null;
-                                } catch (final Exception ex) {
-                                    LOGGER.trace("Ignored exception closing cursor during lock conflict.");
-                                }
-                            }
-                            if (txn != null) {
-                                try {
-                                    txn.abort();
-                                    txn = null;
-                                } catch (final Exception ex) {
-                                    LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
-                                }
-                            }
-                        } finally {
-                            if (cursor != null) {
-                                cursor.close();
-                                cursor = null;
-                            }
-                            if (txn != null) {
-                                txn.abort();
-                                txn = null;
-                            }
-                        }
-                        try {
-                            Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
-                        } catch (final InterruptedException ie) {
-                            // Ignore the error
-                        }
-                    }
-                    if (exception != null) {
-                        LOGGER.error("Unable to delete events from data base", exception);
-                    }
-                }
-            } catch (final Exception ex) {
-                LOGGER.error("Error reading database", ex);
-                shutdown = true;
-                throw ex;
-            } finally {
-                if (cursor != null) {
-                    cursor.close();
-                }
-            }
-
-            return errors;
-        }
-
-        private SimpleEvent createEvent(final DatabaseEntry data) {
-            final SimpleEvent event = new SimpleEvent();
-            try {
-                byte[] eventData = data.getData();
-                if (secretKey != null) {
-                    final Cipher cipher = Cipher.getInstance("AES");
-                    cipher.init(Cipher.DECRYPT_MODE, secretKey);
-                    eventData = cipher.doFinal(eventData);
-                }
-                final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
-                final DataInputStream dais = new DataInputStream(bais);
-                int length = dais.readInt();
-                final byte[] bytes = new byte[length];
-                dais.read(bytes, 0, length);
-                event.setBody(bytes);
-                length = dais.readInt();
-                final Map<String, String> map = new HashMap<>(length);
-                for (int i = 0; i < length; ++i) {
-                    final String headerKey = dais.readUTF();
-                    final String value = dais.readUTF();
-                    map.put(headerKey, value);
-                }
-                event.setHeaders(map);
-                return event;
-            } catch (final Exception ex) {
-                LOGGER.error("Error retrieving event", ex);
-                return null;
-            }
-        }
-
-    }
-
-    /**
-     * Factory that creates Daemon threads that can be properly shut down.
-     */
-    private static class DaemonThreadFactory implements ThreadFactory {
-        private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
-        private final ThreadGroup group;
-        private final AtomicInteger threadNumber = new AtomicInteger(1);
-        private final String namePrefix;
-
-        public DaemonThreadFactory() {
-            final SecurityManager securityManager = System.getSecurityManager();
-            group = securityManager != null ? securityManager.getThreadGroup() :
-                Thread.currentThread().getThreadGroup();
-            namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
-        }
-
-        @Override
-        public Thread newThread(final Runnable r) {
-            final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
-            thread.setDaemon(true);
-            if (thread.getPriority() != Thread.NORM_PRIORITY) {
-                thread.setPriority(Thread.NORM_PRIORITY);
-            }
-            return thread;
-        }
-    }
-
-    /**
-     * An internal class.
-     */
-    private static class Gate {
-
-        private boolean isOpen = false;
-
-        public boolean isOpen() {
-            return isOpen;
-        }
-
-        public synchronized void open() {
-            isOpen = true;
-            notifyAll();
-        }
-
-        public synchronized void close() {
-            isOpen = false;
-        }
-
-        public synchronized void waitForOpen(final long timeout) throws InterruptedException {
-            wait(timeout);
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.crypto.Cipher;
+import javax.crypto.SecretKey;
+
+import org.apache.flume.Event;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.logging.log4j.LoggingException;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
+import org.apache.logging.log4j.core.config.plugins.util.PluginType;
+import org.apache.logging.log4j.core.util.FileUtils;
+import org.apache.logging.log4j.core.util.SecretKeyProvider;
+import org.apache.logging.log4j.util.Strings;
+
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.CursorConfig;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockConflictException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.StatsConfig;
+import com.sleepycat.je.Transaction;
+
+/**
+ * Manager that persists data to Berkeley DB before passing it on to Flume.
+ */
+public class FlumePersistentManager extends FlumeAvroManager {
+
+    /** Attribute name for the key provider. */
+    public static final String KEY_PROVIDER = "keyProvider";
+
+    private static final Charset UTF8 = Charset.forName("UTF-8");
+
+    private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
+
+    private static final int SHUTDOWN_WAIT = 60;
+
+    private static final int MILLIS_PER_SECOND = 1000;
+
+    private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
+
+    private static BDBManagerFactory factory = new BDBManagerFactory();
+
+    private final Database database;
+
+    private final Environment environment;
+
+    private final WriterThread worker;
+
+    private final Gate gate = new Gate();
+
+    private final SecretKey secretKey;
+
+    private final int lockTimeoutRetryCount;
+
+    private final ExecutorService threadPool;
+
+    private final AtomicLong dbCount = new AtomicLong();
+
+    /**
+     * Constructor
+     * @param name The unique name of this manager.
+     * @param shortName Original name for the Manager.
+     * @param agents An array of Agents.
+     * @param batchSize The number of events to include in a batch.
+     * @param retries The number of times to retry connecting before giving up.
+     * @param connectionTimeout The amount of time to wait for a connection to be established.
+     * @param requestTimeout The amount of time to wair for a response to a request.
+     * @param delay The amount of time to wait between retries.
+     * @param database The database to write to.
+     * @param environment The database environment.
+     * @param secretKey The SecretKey to use for encryption.
+     * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
+     */
+    protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
+                                     final int batchSize, final int retries, final int connectionTimeout,
+                                     final int requestTimeout, final int delay, final Database database,
+                                     final Environment environment, final SecretKey secretKey,
+                                     final int lockTimeoutRetryCount) {
+        super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
+        this.database = database;
+        this.environment = environment;
+        dbCount.set(database.count());
+        this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
+            lockTimeoutRetryCount);
+        this.worker.start();
+        this.secretKey = secretKey;
+        this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
+        this.lockTimeoutRetryCount = lockTimeoutRetryCount;
+    }
+
+
+    /**
+     * Returns a FlumeAvroManager.
+     * @param name The name of the manager.
+     * @param agents The agents to use.
+     * @param properties Properties to pass to the Manager.
+     * @param batchSize The number of events to include in a batch.
+     * @param retries The number of times to retry connecting before giving up.
+     * @param connectionTimeout The amount of time to wait to establish a connection.
+     * @param requestTimeout The amount of time to wait for a response to a request.
+     * @param delayMillis Amount of time to delay before delivering a batch.
+     * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
+     * @param dataDir The location of the Berkeley database.
+     * @return A FlumeAvroManager.
+     */
+    public static FlumePersistentManager getManager(final String name, final Agent[] agents,
+                                                    final Property[] properties, int batchSize, final int retries,
+                                                    final int connectionTimeout, final int requestTimeout,
+                                                    final int delayMillis, final int lockTimeoutRetryCount,
+                                                    final String dataDir) {
+        if (agents == null || agents.length == 0) {
+            throw new IllegalArgumentException("At least one agent is required");
+        }
+
+        if (batchSize <= 0) {
+            batchSize = 1;
+        }
+        final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
+
+        final StringBuilder sb = new StringBuilder("FlumePersistent[");
+        boolean first = true;
+        for (final Agent agent : agents) {
+            if (!first) {
+                sb.append(',');
+            }
+            sb.append(agent.getHost()).append(':').append(agent.getPort());
+            first = false;
+        }
+        sb.append(']');
+        sb.append(' ').append(dataDirectory);
+        return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
+            connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
+    }
+
+    @Override
+    public void send(final Event event)  {
+        if (worker.isShutdown()) {
+            throw new LoggingException("Unable to record event");
+        }
+
+        final Map<String, String> headers = event.getHeaders();
+        final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
+        try {
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            final DataOutputStream daos = new DataOutputStream(baos);
+            daos.writeInt(event.getBody().length);
+            daos.write(event.getBody(), 0, event.getBody().length);
+            daos.writeInt(event.getHeaders().size());
+            for (final Map.Entry<String, String> entry : headers.entrySet()) {
+                daos.writeUTF(entry.getKey());
+                daos.writeUTF(entry.getValue());
+            }
+            byte[] eventData = baos.toByteArray();
+            if (secretKey != null) {
+                final Cipher cipher = Cipher.getInstance("AES");
+                cipher.init(Cipher.ENCRYPT_MODE, secretKey);
+                eventData = cipher.doFinal(eventData);
+            }
+            final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
+                gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
+            boolean interrupted = false;
+            int ieCount = 0;
+            do {
+                try {
+                    future.get();
+                } catch (final InterruptedException ie) {
+                    interrupted = true;
+                    ++ieCount;
+                }
+            } while (interrupted && ieCount <= 1);
+
+        } catch (final Exception ex) {
+            throw new LoggingException("Exception occurred writing log event", ex);
+        }
+    }
+
+    @Override
+    protected void releaseSub() {
+        LOGGER.debug("Shutting down FlumePersistentManager");
+        worker.shutdown();
+        try {
+            worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
+        } catch (final InterruptedException ie) {
+            // Ignore the exception and shutdown.
+        }
+        threadPool.shutdown();
+        try {
+            threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
+        } catch (final InterruptedException ie) {
+            LOGGER.warn("PersistentManager Thread pool failed to shut down");
+        }
+        try {
+            worker.join();
+        } catch (final InterruptedException ex) {
+            LOGGER.debug("Interrupted while waiting for worker to complete");
+        }
+        try {
+            LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
+            database.close();
+        } catch (final Exception ex) {
+            LOGGER.warn("Failed to close database", ex);
+        }
+        try {
+            environment.cleanLog();
+            environment.close();
+        } catch (final Exception ex) {
+            LOGGER.warn("Failed to close environment", ex);
+        }
+        super.releaseSub();
+    }
+
+    private void doSend(final SimpleEvent event) {
+        LOGGER.debug("Sending event to Flume");
+        super.send(event);
+    }
+
+    /**
+     * Thread for writing to Berkeley DB to avoid having interrupts close the database.
+     */
+    private static class BDBWriter implements Callable<Integer> {
+        private final byte[] eventData;
+        private final byte[] keyData;
+        private final Environment environment;
+        private final Database database;
+        private final Gate gate;
+        private final AtomicLong dbCount;
+        private final long batchSize;
+        private final int lockTimeoutRetryCount;
+
+        public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
+                         final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
+                         final int lockTimeoutRetryCount) {
+            this.keyData = keyData;
+            this.eventData = eventData;
+            this.environment = environment;
+            this.database = database;
+            this.gate = gate;
+            this.dbCount = dbCount;
+            this.batchSize = batchSize;
+            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
+        }
+
+        @Override
+        public Integer call() throws Exception {
+            final DatabaseEntry key = new DatabaseEntry(keyData);
+            final DatabaseEntry data = new DatabaseEntry(eventData);
+            Exception exception = null;
+            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
+                Transaction txn = null;
+                try {
+                    txn = environment.beginTransaction(null, null);
+                    try {
+                        database.put(txn, key, data);
+                        txn.commit();
+                        txn = null;
+                        if (dbCount.incrementAndGet() >= batchSize) {
+                            gate.open();
+                        }
+                        exception = null;
+                        break;
+                    } catch (final LockConflictException lce) {
+                        exception = lce;
+                        // Fall through and retry.
+                    } catch (final Exception ex) {
+                        if (txn != null) {
+                            txn.abort();
+                        }
+                        throw ex;
+                    } finally {
+                        if (txn != null) {
+                            txn.abort();
+                            txn = null;
+                        }
+                    }
+                } catch (final LockConflictException lce) {
+                    exception = lce;
+                    if (txn != null) {
+                        try {
+                            txn.abort();
+                            txn = null;
+                        } catch (final Exception ex) {
+                            LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
+                        }
+                    }
+
+                }
+                try {
+                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+                } catch (final InterruptedException ie) {
+                    // Ignore the error
+                }
+            }
+            if (exception != null) {
+                throw exception;
+            }
+            return eventData.length;
+        }
+    }
+
+    /**
+     * Factory data.
+     */
+    private static class FactoryData {
+        private final String name;
+        private final Agent[] agents;
+        private final int batchSize;
+        private final String dataDir;
+        private final int retries;
+        private final int connectionTimeout;
+        private final int requestTimeout;
+        private final int delayMillis;
+        private final int lockTimeoutRetryCount;
+        private final Property[] properties;
+
+        /**
+         * Constructor.
+         * @param name The name of the Appender.
+         * @param agents The agents.
+         * @param batchSize The number of events to include in a batch.
+         * @param dataDir The directory for data.
+         */
+        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
+                           final int connectionTimeout, final int requestTimeout, final int delayMillis,
+                           final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
+            this.name = name;
+            this.agents = agents;
+            this.batchSize = batchSize;
+            this.dataDir = dataDir;
+            this.retries = retries;
+            this.connectionTimeout = connectionTimeout;
+            this.requestTimeout = requestTimeout;
+            this.delayMillis = delayMillis;
+            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
+            this.properties = properties;
+        }
+    }
+
+    /**
+     * Avro Manager Factory.
+     */
+    private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
+
+        /**
+         * Create the FlumeKratiManager.
+         * @param name The name of the entity to manage.
+         * @param data The data required to create the entity.
+         * @return The FlumeKratiManager.
+         */
+        @Override
+        public FlumePersistentManager createManager(final String name, final FactoryData data) {
+            SecretKey secretKey = null;
+            Database database = null;
+            Environment environment = null;
+
+            final Map<String, String> properties = new HashMap<>();
+            if (data.properties != null) {
+                for (final Property property : data.properties) {
+                    properties.put(property.getName(), property.getValue());
+                }
+            }
+
+            try {
+                final File dir = new File(data.dataDir);
+                FileUtils.mkdir(dir, true);
+                final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
+                dbEnvConfig.setTransactional(true);
+                dbEnvConfig.setAllowCreate(true);
+                dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
+                environment = new Environment(dir, dbEnvConfig);
+                final DatabaseConfig dbConfig = new DatabaseConfig();
+                dbConfig.setTransactional(true);
+                dbConfig.setAllowCreate(true);
+                database = environment.openDatabase(null, name, dbConfig);
+            } catch (final Exception ex) {
+                LOGGER.error("Could not create FlumePersistentManager", ex);
+                // For consistency, close database as well as environment even though it should never happen since the
+                // database is that last thing in the block above, but this does guard against a future line being
+                // inserted at the end that would bomb (like some debug logging).
+                if (database != null) {
+                    database.close();
+                    database = null;
+                }
+                if (environment != null) {
+                    environment.close();
+                    environment = null;
+                }
+                return null;
+            }
+
+            try {
+                String key = null;
+                for (final Map.Entry<String, String> entry : properties.entrySet()) {
+                    if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
+                        key = entry.getValue();
+                        break;
+                    }
+                }
+                if (key != null) {
+                    final PluginManager manager = new PluginManager("KeyProvider");
+                    manager.collectPlugins();
+                    final Map<String, PluginType<?>> plugins = manager.getPlugins();
+                    if (plugins != null) {
+                        boolean found = false;
+                        for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
+                            if (entry.getKey().equalsIgnoreCase(key)) {
+                                found = true;
+                                final Class<?> cl = entry.getValue().getPluginClass();
+                                try {
+                                    final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
+                                    secretKey = provider.getSecretKey();
+                                    LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
+                                } catch (final Exception ex) {
+                                    LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
+                                        cl.getName());
+                                }
+                                break;
+                            }
+                        }
+                        if (!found) {
+                            LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
+                        }
+                    } else {
+                        LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
+                    }
+                }
+            } catch (final Exception ex) {
+                LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
+            }
+            return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
+                data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
+                data.lockTimeoutRetryCount);
+        }
+    }
+
+    /**
+     * Thread that sends data to Flume and pulls it from Berkeley DB.
+     */
+    private static class WriterThread extends Thread  {
+        private volatile boolean shutdown = false;
+        private final Database database;
+        private final Environment environment;
+        private final FlumePersistentManager manager;
+        private final Gate gate;
+        private final SecretKey secretKey;
+        private final int batchSize;
+        private final AtomicLong dbCounter;
+        private final int lockTimeoutRetryCount;
+
+        public WriterThread(final Database database, final Environment environment,
+                            final FlumePersistentManager manager, final Gate gate, final int batchsize,
+                            final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
+            this.database = database;
+            this.environment = environment;
+            this.manager = manager;
+            this.gate = gate;
+            this.batchSize = batchsize;
+            this.secretKey = secretKey;
+            this.setDaemon(true);
+            this.dbCounter = dbCount;
+            this.lockTimeoutRetryCount = lockTimeoutRetryCount;
+        }
+
+        public void shutdown() {
+            LOGGER.debug("Writer thread shutting down");
+            this.shutdown = true;
+            gate.open();
+        }
+
+        public boolean isShutdown() {
+            return shutdown;
+        }
+
+        @Override
+        public void run() {
+            LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
+            long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
+            while (!shutdown) {
+                final long nowMillis = System.currentTimeMillis();
+                final long dbCount = database.count();
+                dbCounter.set(dbCount);
+                if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
+                    nextBatchMillis = nowMillis + manager.getDelayMillis();
+                    try {
+                        boolean errors = false;
+                        final DatabaseEntry key = new DatabaseEntry();
+                        final DatabaseEntry data = new DatabaseEntry();
+
+                        gate.close();
+                        OperationStatus status;
+                        if (batchSize > 1) {
+                            try {
+                                errors = sendBatch(key, data);
+                            } catch (final Exception ex) {
+                                break;
+                            }
+                        } else {
+                            Exception exception = null;
+                            for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
+                                exception = null;
+                                Transaction txn = null;
+                                Cursor cursor = null;
+                                try {
+                                    txn = environment.beginTransaction(null, null);
+                                    cursor = database.openCursor(txn, null);
+                                    try {
+                                        status = cursor.getFirst(key, data, LockMode.RMW);
+                                        while (status == OperationStatus.SUCCESS) {
+                                            final SimpleEvent event = createEvent(data);
+                                            if (event != null) {
+                                                try {
+                                                    manager.doSend(event);
+                                                } catch (final Exception ioe) {
+                                                    errors = true;
+                                                    LOGGER.error("Error sending event", ioe);
+                                                    break;
+                                                }
+                                                try {
+                                                    cursor.delete();
+                                                } catch (final Exception ex) {
+                                                    LOGGER.error("Unable to delete event", ex);
+                                                }
+                                            }
+                                            status = cursor.getNext(key, data, LockMode.RMW);
+                                        }
+                                        if (cursor != null) {
+                                            cursor.close();
+                                            cursor = null;
+                                        }
+                                        txn.commit();
+                                        txn = null;
+                                        dbCounter.decrementAndGet();
+                                        exception = null;
+                                        break;
+                                    } catch (final LockConflictException lce) {
+                                        exception = lce;
+                                        // Fall through and retry.
+                                    } catch (final Exception ex) {
+                                        LOGGER.error("Error reading or writing to database", ex);
+                                        shutdown = true;
+                                        break;
+                                    } finally {
+                                        if (cursor != null) {
+                                            cursor.close();
+                                            cursor = null;
+                                        }
+                                        if (txn != null) {
+                                            txn.abort();
+                                            txn = null;
+                                        }
+                                    }
+                                } catch (final LockConflictException lce) {
+                                    exception = lce;
+                                    if (cursor != null) {
+                                        try {
+                                            cursor.close();
+                                            cursor = null;
+                                        } catch (final Exception ex) {
+                                            LOGGER.trace("Ignored exception closing cursor during lock conflict.");
+                                        }
+                                    }
+                                    if (txn != null) {
+                                        try {
+                                            txn.abort();
+                                            txn = null;
+                                        } catch (final Exception ex) {
+                                            LOGGER.trace("Ignored exception aborting tx during lock conflict.");
+                                        }
+                                    }
+                                }
+                                try {
+                                    Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+                                } catch (final InterruptedException ie) {
+                                    // Ignore the error
+                                }
+                            }
+                            if (exception != null) {
+                                LOGGER.error("Unable to read or update data base", exception);
+                            }
+                        }
+                        if (errors) {
+                            Thread.sleep(manager.getDelayMillis());
+                            continue;
+                        }
+                    } catch (final Exception ex) {
+                        LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
+                    }
+                } else {
+                    if (nextBatchMillis <= nowMillis) {
+                        nextBatchMillis = nowMillis + manager.getDelayMillis();
+                    }
+                    try {
+                        final long interval = nextBatchMillis - nowMillis;
+                        gate.waitForOpen(interval);
+                    } catch (final InterruptedException ie) {
+                        LOGGER.warn("WriterThread interrupted, continuing");
+                    } catch (final Exception ex) {
+                        LOGGER.error("WriterThread encountered an exception waiting for work", ex);
+                        break;
+                    }
+                }
+            }
+
+            if (batchSize > 1 && database.count() > 0) {
+                final DatabaseEntry key = new DatabaseEntry();
+                final DatabaseEntry data = new DatabaseEntry();
+                try {
+                    sendBatch(key, data);
+                } catch (final Exception ex) {
+                    LOGGER.warn("Unable to write final batch");
+                }
+            }
+            LOGGER.trace("WriterThread exiting");
+        }
+
+        private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
+            boolean errors = false;
+            OperationStatus status;
+            Cursor cursor = null;
+            try {
+            	final BatchEvent batch = new BatchEvent();
+            	for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
+            		try {
+            			cursor = database.openCursor(null, CursorConfig.DEFAULT);
+            			status = cursor.getFirst(key, data, null);
+
+            			for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
+            				final SimpleEvent event = createEvent(data);
+            				if (event != null) {
+            					batch.addEvent(event);
+            				}
+            				status = cursor.getNext(key, data, null);
+            			}
+            			break;
+            		} catch (final LockConflictException lce) {
+            			if (cursor != null) {
+            				try {
+                                cursor.close();
+                                cursor = null;
+                            } catch (final Exception ex) {
+                                LOGGER.trace("Ignored exception closing cursor during lock conflict.");
+                            }
+                        }
+                    }
+            	}
+
+                try {
+                    manager.send(batch);
+                } catch (final Exception ioe) {
+                    LOGGER.error("Error sending events", ioe);
+                    errors = true;
+                }
+                if (!errors) {
+                	if (cursor != null) {
+	                    cursor.close();
+	                    cursor = null;
+                	}
+                    Transaction txn = null;
+                    Exception exception = null;
+                    for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
+                        try {
+                            txn = environment.beginTransaction(null, null);
+                            try {
+                                for (final Event event : batch.getEvents()) {
+                                    try {
+                                        final Map<String, String> headers = event.getHeaders();
+                                        key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
+                                        database.delete(txn, key);
+                                    } catch (final Exception ex) {
+                                        LOGGER.error("Error deleting key from database", ex);
+                                    }
+                                }
+                                txn.commit();
+                                long count = dbCounter.get();
+                                while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
+                                    count = dbCounter.get();
+                                }
+                                exception = null;
+                                break;
+                            } catch (final LockConflictException lce) {
+                                exception = lce;
+                                if (cursor != null) {
+                                    try {
+                                        cursor.close();
+                                        cursor = null;
+                                    } catch (final Exception ex) {
+                                        LOGGER.trace("Ignored exception closing cursor during lock conflict.");
+                                    }
+                                }
+                                if (txn != null) {
+                                    try {
+                                        txn.abort();
+                                        txn = null;
+                                    } catch (final Exception ex) {
+                                        LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
+                                    }
+                                }
+                            } catch (final Exception ex) {
+                                LOGGER.error("Unable to commit transaction", ex);
+                                if (txn != null) {
+                                    txn.abort();
+                                }
+                            }
+                        } catch (final LockConflictException lce) {
+                            exception = lce;
+                            if (cursor != null) {
+                                try {
+                                    cursor.close();
+                                    cursor = null;
+                                } catch (final Exception ex) {
+                                    LOGGER.trace("Ignored exception closing cursor during lock conflict.");
+                                }
+                            }
+                            if (txn != null) {
+                                try {
+                                    txn.abort();
+                                    txn = null;
+                                } catch (final Exception ex) {
+                                    LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
+                                }
+                            }
+                        } finally {
+                            if (cursor != null) {
+                                cursor.close();
+                                cursor = null;
+                            }
+                            if (txn != null) {
+                                txn.abort();
+                                txn = null;
+                            }
+                        }
+                        try {
+                            Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+                        } catch (final InterruptedException ie) {
+                            // Ignore the error
+                        }
+                    }
+                    if (exception != null) {
+                        LOGGER.error("Unable to delete events from data base", exception);
+                    }
+                }
+            } catch (final Exception ex) {
+                LOGGER.error("Error reading database", ex);
+                shutdown = true;
+                throw ex;
+            } finally {
+                if (cursor != null) {
+                    cursor.close();
+                }
+            }
+
+            return errors;
+        }
+
+        private SimpleEvent createEvent(final DatabaseEntry data) {
+            final SimpleEvent event = new SimpleEvent();
+            try {
+                byte[] eventData = data.getData();
+                if (secretKey != null) {
+                    final Cipher cipher = Cipher.getInstance("AES");
+                    cipher.init(Cipher.DECRYPT_MODE, secretKey);
+                    eventData = cipher.doFinal(eventData);
+                }
+                final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
+                final DataInputStream dais = new DataInputStream(bais);
+                int length = dais.readInt();
+                final byte[] bytes = new byte[length];
+                dais.read(bytes, 0, length);
+                event.setBody(bytes);
+                length = dais.readInt();
+                final Map<String, String> map = new HashMap<>(length);
+                for (int i = 0; i < length; ++i) {
+                    final String headerKey = dais.readUTF();
+                    final String value = dais.readUTF();
+                    map.put(headerKey, value);
+                }
+                event.setHeaders(map);
+                return event;
+            } catch (final Exception ex) {
+                LOGGER.error("Error retrieving event", ex);
+                return null;
+            }
+        }
+
+    }
+
+    /**
+     * Factory that creates Daemon threads that can be properly shut down.
+     */
+    private static class DaemonThreadFactory implements ThreadFactory {
+        private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
+        private final ThreadGroup group;
+        private final AtomicInteger threadNumber = new AtomicInteger(1);
+        private final String namePrefix;
+
+        public DaemonThreadFactory() {
+            final SecurityManager securityManager = System.getSecurityManager();
+            group = securityManager != null ? securityManager.getThreadGroup() :
+                Thread.currentThread().getThreadGroup();
+            namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
+        }
+
+        @Override
+        public Thread newThread(final Runnable r) {
+            final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
+            thread.setDaemon(true);
+            if (thread.getPriority() != Thread.NORM_PRIORITY) {
+                thread.setPriority(Thread.NORM_PRIORITY);
+            }
+            return thread;
+        }
+    }
+
+    /**
+     * An internal class.
+     */
+    private static class Gate {
+
+        private boolean isOpen = false;
+
+        public boolean isOpen() {
+            return isOpen;
+        }
+
+        public synchronized void open() {
+            isOpen = true;
+            notifyAll();
+        }
+
+        public synchronized void close() {
+            isOpen = false;
+        }
+
+        public synchronized void waitForOpen(final long timeout) throws InterruptedException {
+            wait(timeout);
+        }
+    }
+}


[2/2] logging-log4j2 git commit: Checkstyle: do not hide field.

Posted by gg...@apache.org.
Checkstyle: do not hide field.

Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/a7f67f8b
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/a7f67f8b
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/a7f67f8b

Branch: refs/heads/master
Commit: a7f67f8bb989c36525904a2388043c0037489462
Parents: 5f8aadd
Author: ggregory <gg...@apache.org>
Authored: Mon Sep 14 14:08:10 2015 -0700
Committer: ggregory <gg...@apache.org>
Committed: Mon Sep 14 14:08:10 2015 -0700

----------------------------------------------------------------------
 .../log4j/flume/appender/FlumeAvroManager.java  | 664 +++++++++----------
 1 file changed, 332 insertions(+), 332 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/a7f67f8b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
index d6e0d34..ef3234a 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
@@ -1,332 +1,332 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-package org.apache.logging.log4j.flume.appender;
-
-import java.util.Properties;
-
-import org.apache.flume.Event;
-import org.apache.flume.api.RpcClient;
-import org.apache.flume.api.RpcClientFactory;
-import org.apache.logging.log4j.core.appender.AppenderLoggingException;
-import org.apache.logging.log4j.core.appender.ManagerFactory;
-
-/**
- * Manager for FlumeAvroAppenders.
- */
-public class FlumeAvroManager extends AbstractFlumeManager {
-
-    private static final int MAX_RECONNECTS = 3;
-    private static final int MINIMUM_TIMEOUT = 1000;
-
-    private static AvroManagerFactory factory = new AvroManagerFactory();
-
-    private final Agent[] agents;
-
-    private final int batchSize;
-
-    private final long delayNanos;
-    private final int delayMillis;
-
-    private final int retries;
-
-    private final int connectTimeoutMillis;
-
-    private final int requestTimeoutMillis;
-
-    private final int current = 0;
-
-    private RpcClient rpcClient = null;
-
-    private BatchEvent batchEvent = new BatchEvent();
-    private long nextSend = 0;
-
-    /**
-     * Constructor
-     * @param name The unique name of this manager.
-     * @param agents An array of Agents.
-     * @param batchSize The number of events to include in a batch.
-     * @param retries The number of times to retry connecting before giving up.
-     * @param connectTimeout The connection timeout in ms.
-     * @param requestTimeout The request timeout in ms.
-     *
-     */
-    protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
-                               final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
-        super(name);
-        this.agents = agents;
-        this.batchSize = batchSize;
-        this.delayMillis = delayMillis;
-        this.delayNanos = delayMillis * 1000000;
-        this.retries = retries;
-        this.connectTimeoutMillis = connectTimeout;
-        this.requestTimeoutMillis = requestTimeout;
-        this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
-    }
-
-    /**
-     * Returns a FlumeAvroManager.
-     * @param name The name of the manager.
-     * @param agents The agents to use.
-     * @param batchSize The number of events to include in a batch.
-     * @param delayMillis The number of milliseconds to wait before sending an incomplete batch.
-     * @param retries The number of times to retry connecting before giving up.
-     * @param connectTimeoutMillis The connection timeout in ms.
-     * @param requestTimeoutMillis The request timeout in ms.
-     * @return A FlumeAvroManager.
-     */
-    public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis,
-                                              final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
-        if (agents == null || agents.length == 0) {
-            throw new IllegalArgumentException("At least one agent is required");
-        }
-
-        if (batchSize <= 0) {
-            batchSize = 1;
-        }
-
-        final StringBuilder sb = new StringBuilder("FlumeAvro[");
-        boolean first = true;
-        for (final Agent agent : agents) {
-            if (!first) {
-                sb.append(',');
-            }
-            sb.append(agent.getHost()).append(':').append(agent.getPort());
-            first = false;
-        }
-        sb.append(']');
-        return getManager(sb.toString(), factory,
-                new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
-    }
-
-    /**
-     * Returns the agents.
-     * @return The agent array.
-     */
-    public Agent[] getAgents() {
-        return agents;
-    }
-
-    /**
-     * Returns the index of the current agent.
-     * @return The index for the current agent.
-     */
-    public int getCurrent() {
-        return current;
-    }
-
-    public int getRetries() {
-        return retries;
-    }
-
-    public int getConnectTimeoutMillis() {
-        return connectTimeoutMillis;
-    }
-
-    public int getRequestTimeoutMillis() {
-        return requestTimeoutMillis;
-    }
-
-    public int getBatchSize() {
-        return batchSize;
-    }
-
-    public int getDelayMillis() {
-        return delayMillis;
-    }
-
-    public synchronized void send(final BatchEvent events) {
-        if (rpcClient == null) {
-            rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
-        }
-
-        if (rpcClient != null) {
-            try {
-                LOGGER.trace("Sending batch of {} events", events.getEvents().size());
-                rpcClient.appendBatch(events.getEvents());
-            } catch (final Exception ex) {
-                rpcClient.close();
-                rpcClient = null;
-                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
-                    agents[current].getPort();
-                LOGGER.warn(msg, ex);
-                throw new AppenderLoggingException("No Flume agents are available");
-            }
-        }  else {
-            final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
-                agents[current].getPort();
-            LOGGER.warn(msg);
-            throw new AppenderLoggingException("No Flume agents are available");
-        }
-    }
-
-    @Override
-    public synchronized void send(final Event event)  {
-        if (batchSize == 1) {
-            if (rpcClient == null) {
-                rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
-            }
-
-            if (rpcClient != null) {
-                try {
-                    rpcClient.append(event);
-                } catch (final Exception ex) {
-                    rpcClient.close();
-                    rpcClient = null;
-                    final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
-                            agents[current].getPort();
-                    LOGGER.warn(msg, ex);
-                    throw new AppenderLoggingException("No Flume agents are available");
-                }
-            } else {
-                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
-                        agents[current].getPort();
-                LOGGER.warn(msg);
-                throw new AppenderLoggingException("No Flume agents are available");
-            }
-        } else {
-            batchEvent.addEvent(event);
-            final int count = batchEvent.getEvents().size();
-            if (count == 1) {
-                nextSend = System.nanoTime() + delayNanos;
-            }
-            if (count >= batchSize || System.nanoTime() >= nextSend) {
-                send(batchEvent);
-                batchEvent = new BatchEvent();
-            }
-        }
-    }
-
-    /**
-     * There is a very good chance that this will always return the first agent even if it isn't available.
-     * @param agents The list of agents to choose from
-     * @return The FlumeEventAvroServer.
-     */
-    private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
-        try {
-            final Properties props = new Properties();
-
-            props.put("client.type", "default_failover");
-
-            int count = 1;
-            final StringBuilder sb = new StringBuilder();
-            for (final Agent agent : agents) {
-                if (sb.length() > 0) {
-                    sb.append(' ');
-                }
-                final String hostName = "host" + count++;
-                props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
-                sb.append(hostName);
-            }
-            props.put("hosts", sb.toString());
-            if (batchSize > 0) {
-                props.put("batch-size", Integer.toString(batchSize));
-            }
-            if (retries > 1) {
-                if (retries > MAX_RECONNECTS) {
-                    retries = MAX_RECONNECTS;
-                }
-                props.put("max-attempts", Integer.toString(retries * agents.length));
-            }
-            if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
-                props.put("request-timeout", Integer.toString(requestTimeoutMillis));
-            }
-            if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
-                props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
-            }
-            return RpcClientFactory.getInstance(props);
-        } catch (final Exception ex) {
-            LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
-            return null;
-        }
-    }
-
-    @Override
-    protected void releaseSub() {
-        if (rpcClient != null) {
-            try {
-                synchronized(this) {
-                    try {
-                        if (batchSize > 1 && batchEvent.getEvents().size() > 0) {
-                            send(batchEvent);
-                        }
-                    } catch (final Exception ex) {
-                        LOGGER.error("Error sending final batch: {}", ex.getMessage());
-                    }
-                }
-                rpcClient.close();
-            } catch (final Exception ex) {
-                LOGGER.error("Attempt to close RPC client failed", ex);
-            }
-        }
-        rpcClient = null;
-    }
-
-    /**
-     * Factory data.
-     */
-    private static class FactoryData {
-        private final String name;
-        private final Agent[] agents;
-        private final int batchSize;
-        private final int delayMillis;
-        private final int retries;
-        private final int conntectTimeoutMillis;
-        private final int requestTimeoutMillis;
-
-        /**
-         * Constructor.
-         * @param name The name of the Appender.
-         * @param agents The agents.
-         * @param batchSize The number of events to include in a batch.
-         */
-        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
-                final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
-            this.name = name;
-            this.agents = agents;
-            this.batchSize = batchSize;
-            this.delayMillis = delayMillis;
-            this.retries = retries;
-            this.conntectTimeoutMillis = connectTimeoutMillis;
-            this.requestTimeoutMillis = requestTimeoutMillis;
-        }
-    }
-
-    /**
-     * Avro Manager Factory.
-     */
-    private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
-
-        /**
-         * Create the FlumeAvroManager.
-         * @param name The name of the entity to manage.
-         * @param data The data required to create the entity.
-         * @return The FlumeAvroManager.
-         */
-        @Override
-        public FlumeAvroManager createManager(final String name, final FactoryData data) {
-            try {
-
-                return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
-                        data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
-            } catch (final Exception ex) {
-                LOGGER.error("Could not create FlumeAvroManager", ex);
-            }
-            return null;
-        }
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import java.util.Properties;
+
+import org.apache.flume.Event;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.logging.log4j.core.appender.AppenderLoggingException;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+
+/**
+ * Manager for FlumeAvroAppenders.
+ */
+public class FlumeAvroManager extends AbstractFlumeManager {
+
+    private static final int MAX_RECONNECTS = 3;
+    private static final int MINIMUM_TIMEOUT = 1000;
+
+    private static AvroManagerFactory factory = new AvroManagerFactory();
+
+    private final Agent[] agents;
+
+    private final int batchSize;
+
+    private final long delayNanos;
+    private final int delayMillis;
+
+    private final int retries;
+
+    private final int connectTimeoutMillis;
+
+    private final int requestTimeoutMillis;
+
+    private final int current = 0;
+
+    private RpcClient rpcClient = null;
+
+    private BatchEvent batchEvent = new BatchEvent();
+    private long nextSend = 0;
+
+    /**
+     * Constructor
+     * @param name The unique name of this manager.
+     * @param agents An array of Agents.
+     * @param batchSize The number of events to include in a batch.
+     * @param retries The number of times to retry connecting before giving up.
+     * @param connectTimeout The connection timeout in ms.
+     * @param requestTimeout The request timeout in ms.
+     *
+     */
+    protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
+                               final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
+        super(name);
+        this.agents = agents;
+        this.batchSize = batchSize;
+        this.delayMillis = delayMillis;
+        this.delayNanos = delayMillis * 1000000;
+        this.retries = retries;
+        this.connectTimeoutMillis = connectTimeout;
+        this.requestTimeoutMillis = requestTimeout;
+        this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
+    }
+
+    /**
+     * Returns a FlumeAvroManager.
+     * @param name The name of the manager.
+     * @param agents The agents to use.
+     * @param batchSize The number of events to include in a batch.
+     * @param delayMillis The number of milliseconds to wait before sending an incomplete batch.
+     * @param retries The number of times to retry connecting before giving up.
+     * @param connectTimeoutMillis The connection timeout in ms.
+     * @param requestTimeoutMillis The request timeout in ms.
+     * @return A FlumeAvroManager.
+     */
+    public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis,
+                                              final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
+        if (agents == null || agents.length == 0) {
+            throw new IllegalArgumentException("At least one agent is required");
+        }
+
+        if (batchSize <= 0) {
+            batchSize = 1;
+        }
+
+        final StringBuilder sb = new StringBuilder("FlumeAvro[");
+        boolean first = true;
+        for (final Agent agent : agents) {
+            if (!first) {
+                sb.append(',');
+            }
+            sb.append(agent.getHost()).append(':').append(agent.getPort());
+            first = false;
+        }
+        sb.append(']');
+        return getManager(sb.toString(), factory,
+                new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
+    }
+
+    /**
+     * Returns the agents.
+     * @return The agent array.
+     */
+    public Agent[] getAgents() {
+        return agents;
+    }
+
+    /**
+     * Returns the index of the current agent.
+     * @return The index for the current agent.
+     */
+    public int getCurrent() {
+        return current;
+    }
+
+    public int getRetries() {
+        return retries;
+    }
+
+    public int getConnectTimeoutMillis() {
+        return connectTimeoutMillis;
+    }
+
+    public int getRequestTimeoutMillis() {
+        return requestTimeoutMillis;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public int getDelayMillis() {
+        return delayMillis;
+    }
+
+    public synchronized void send(final BatchEvent events) {
+        if (rpcClient == null) {
+            rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
+        }
+
+        if (rpcClient != null) {
+            try {
+                LOGGER.trace("Sending batch of {} events", events.getEvents().size());
+                rpcClient.appendBatch(events.getEvents());
+            } catch (final Exception ex) {
+                rpcClient.close();
+                rpcClient = null;
+                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+                    agents[current].getPort();
+                LOGGER.warn(msg, ex);
+                throw new AppenderLoggingException("No Flume agents are available");
+            }
+        }  else {
+            final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+                agents[current].getPort();
+            LOGGER.warn(msg);
+            throw new AppenderLoggingException("No Flume agents are available");
+        }
+    }
+
+    @Override
+    public synchronized void send(final Event event)  {
+        if (batchSize == 1) {
+            if (rpcClient == null) {
+                rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
+            }
+
+            if (rpcClient != null) {
+                try {
+                    rpcClient.append(event);
+                } catch (final Exception ex) {
+                    rpcClient.close();
+                    rpcClient = null;
+                    final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+                            agents[current].getPort();
+                    LOGGER.warn(msg, ex);
+                    throw new AppenderLoggingException("No Flume agents are available");
+                }
+            } else {
+                final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+                        agents[current].getPort();
+                LOGGER.warn(msg);
+                throw new AppenderLoggingException("No Flume agents are available");
+            }
+        } else {
+            batchEvent.addEvent(event);
+            final int eventCount = batchEvent.getEvents().size();
+            if (eventCount == 1) {
+                nextSend = System.nanoTime() + delayNanos;
+            }
+            if (eventCount >= batchSize || System.nanoTime() >= nextSend) {
+                send(batchEvent);
+                batchEvent = new BatchEvent();
+            }
+        }
+    }
+
+    /**
+     * There is a very good chance that this will always return the first agent even if it isn't available.
+     * @param agents The list of agents to choose from
+     * @return The FlumeEventAvroServer.
+     */
+    private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
+        try {
+            final Properties props = new Properties();
+
+            props.put("client.type", "default_failover");
+
+            int agentCount = 1;
+            final StringBuilder sb = new StringBuilder();
+            for (final Agent agent : agents) {
+                if (sb.length() > 0) {
+                    sb.append(' ');
+                }
+                final String hostName = "host" + agentCount++;
+                props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
+                sb.append(hostName);
+            }
+            props.put("hosts", sb.toString());
+            if (batchSize > 0) {
+                props.put("batch-size", Integer.toString(batchSize));
+            }
+            if (retries > 1) {
+                if (retries > MAX_RECONNECTS) {
+                    retries = MAX_RECONNECTS;
+                }
+                props.put("max-attempts", Integer.toString(retries * agents.length));
+            }
+            if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
+                props.put("request-timeout", Integer.toString(requestTimeoutMillis));
+            }
+            if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
+                props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
+            }
+            return RpcClientFactory.getInstance(props);
+        } catch (final Exception ex) {
+            LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
+            return null;
+        }
+    }
+
+    @Override
+    protected void releaseSub() {
+        if (rpcClient != null) {
+            try {
+                synchronized(this) {
+                    try {
+                        if (batchSize > 1 && batchEvent.getEvents().size() > 0) {
+                            send(batchEvent);
+                        }
+                    } catch (final Exception ex) {
+                        LOGGER.error("Error sending final batch: {}", ex.getMessage());
+                    }
+                }
+                rpcClient.close();
+            } catch (final Exception ex) {
+                LOGGER.error("Attempt to close RPC client failed", ex);
+            }
+        }
+        rpcClient = null;
+    }
+
+    /**
+     * Factory data.
+     */
+    private static class FactoryData {
+        private final String name;
+        private final Agent[] agents;
+        private final int batchSize;
+        private final int delayMillis;
+        private final int retries;
+        private final int conntectTimeoutMillis;
+        private final int requestTimeoutMillis;
+
+        /**
+         * Constructor.
+         * @param name The name of the Appender.
+         * @param agents The agents.
+         * @param batchSize The number of events to include in a batch.
+         */
+        public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
+                final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
+            this.name = name;
+            this.agents = agents;
+            this.batchSize = batchSize;
+            this.delayMillis = delayMillis;
+            this.retries = retries;
+            this.conntectTimeoutMillis = connectTimeoutMillis;
+            this.requestTimeoutMillis = requestTimeoutMillis;
+        }
+    }
+
+    /**
+     * Avro Manager Factory.
+     */
+    private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
+
+        /**
+         * Create the FlumeAvroManager.
+         * @param name The name of the entity to manage.
+         * @param data The data required to create the entity.
+         * @return The FlumeAvroManager.
+         */
+        @Override
+        public FlumeAvroManager createManager(final String name, final FactoryData data) {
+            try {
+
+                return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
+                        data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
+            } catch (final Exception ex) {
+                LOGGER.error("Could not create FlumeAvroManager", ex);
+            }
+            return null;
+        }
+    }
+
+}