You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by gg...@apache.org on 2015/09/14 23:26:04 UTC
[1/2] logging-log4j2 git commit: Checkstyle: do not hide field.
Repository: logging-log4j2
Updated Branches:
refs/heads/master a610ba84f -> a7f67f8bb
Checkstyle: do not hide field.
Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/5f8aaddc
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/5f8aaddc
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/5f8aaddc
Branch: refs/heads/master
Commit: 5f8aaddc9613bf0a425060238dfd20e61c9ebe34
Parents: a610ba8
Author: ggregory <gg...@apache.org>
Authored: Mon Sep 14 14:07:00 2015 -0700
Committer: ggregory <gg...@apache.org>
Committed: Mon Sep 14 14:07:00 2015 -0700
----------------------------------------------------------------------
.../flume/appender/FlumePersistentManager.java | 1754 +++++++++---------
1 file changed, 877 insertions(+), 877 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/5f8aaddc/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
index c0f8879..3383d79 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumePersistentManager.java
@@ -1,877 +1,877 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-package org.apache.logging.log4j.flume.appender;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.crypto.Cipher;
-import javax.crypto.SecretKey;
-
-import org.apache.flume.Event;
-import org.apache.flume.event.SimpleEvent;
-import org.apache.logging.log4j.LoggingException;
-import org.apache.logging.log4j.core.appender.ManagerFactory;
-import org.apache.logging.log4j.core.config.Property;
-import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
-import org.apache.logging.log4j.core.config.plugins.util.PluginType;
-import org.apache.logging.log4j.core.util.FileUtils;
-import org.apache.logging.log4j.core.util.SecretKeyProvider;
-import org.apache.logging.log4j.util.Strings;
-
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.CursorConfig;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.LockConflictException;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.StatsConfig;
-import com.sleepycat.je.Transaction;
-
-/**
- * Manager that persists data to Berkeley DB before passing it on to Flume.
- */
-public class FlumePersistentManager extends FlumeAvroManager {
-
- /** Attribute name for the key provider. */
- public static final String KEY_PROVIDER = "keyProvider";
-
- private static final Charset UTF8 = Charset.forName("UTF-8");
-
- private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
-
- private static final int SHUTDOWN_WAIT = 60;
-
- private static final int MILLIS_PER_SECOND = 1000;
-
- private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
-
- private static BDBManagerFactory factory = new BDBManagerFactory();
-
- private final Database database;
-
- private final Environment environment;
-
- private final WriterThread worker;
-
- private final Gate gate = new Gate();
-
- private final SecretKey secretKey;
-
- private final int lockTimeoutRetryCount;
-
- private final ExecutorService threadPool;
-
- private final AtomicLong dbCount = new AtomicLong();
-
- /**
- * Constructor
- * @param name The unique name of this manager.
- * @param shortName Original name for the Manager.
- * @param agents An array of Agents.
- * @param batchSize The number of events to include in a batch.
- * @param retries The number of times to retry connecting before giving up.
- * @param connectionTimeout The amount of time to wait for a connection to be established.
- * @param requestTimeout The amount of time to wair for a response to a request.
- * @param delay The amount of time to wait between retries.
- * @param database The database to write to.
- * @param environment The database environment.
- * @param secretKey The SecretKey to use for encryption.
- * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
- */
- protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
- final int batchSize, final int retries, final int connectionTimeout,
- final int requestTimeout, final int delay, final Database database,
- final Environment environment, final SecretKey secretKey,
- final int lockTimeoutRetryCount) {
- super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
- this.database = database;
- this.environment = environment;
- dbCount.set(database.count());
- this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
- lockTimeoutRetryCount);
- this.worker.start();
- this.secretKey = secretKey;
- this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
- this.lockTimeoutRetryCount = lockTimeoutRetryCount;
- }
-
-
- /**
- * Returns a FlumeAvroManager.
- * @param name The name of the manager.
- * @param agents The agents to use.
- * @param properties Properties to pass to the Manager.
- * @param batchSize The number of events to include in a batch.
- * @param retries The number of times to retry connecting before giving up.
- * @param connectionTimeout The amount of time to wait to establish a connection.
- * @param requestTimeout The amount of time to wait for a response to a request.
- * @param delayMillis Amount of time to delay before delivering a batch.
- * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
- * @param dataDir The location of the Berkeley database.
- * @return A FlumeAvroManager.
- */
- public static FlumePersistentManager getManager(final String name, final Agent[] agents,
- final Property[] properties, int batchSize, final int retries,
- final int connectionTimeout, final int requestTimeout,
- final int delayMillis, final int lockTimeoutRetryCount,
- final String dataDir) {
- if (agents == null || agents.length == 0) {
- throw new IllegalArgumentException("At least one agent is required");
- }
-
- if (batchSize <= 0) {
- batchSize = 1;
- }
- final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
-
- final StringBuilder sb = new StringBuilder("FlumePersistent[");
- boolean first = true;
- for (final Agent agent : agents) {
- if (!first) {
- sb.append(',');
- }
- sb.append(agent.getHost()).append(':').append(agent.getPort());
- first = false;
- }
- sb.append(']');
- sb.append(' ').append(dataDirectory);
- return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
- connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
- }
-
- @Override
- public void send(final Event event) {
- if (worker.isShutdown()) {
- throw new LoggingException("Unable to record event");
- }
-
- final Map<String, String> headers = event.getHeaders();
- final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
- try {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final DataOutputStream daos = new DataOutputStream(baos);
- daos.writeInt(event.getBody().length);
- daos.write(event.getBody(), 0, event.getBody().length);
- daos.writeInt(event.getHeaders().size());
- for (final Map.Entry<String, String> entry : headers.entrySet()) {
- daos.writeUTF(entry.getKey());
- daos.writeUTF(entry.getValue());
- }
- byte[] eventData = baos.toByteArray();
- if (secretKey != null) {
- final Cipher cipher = Cipher.getInstance("AES");
- cipher.init(Cipher.ENCRYPT_MODE, secretKey);
- eventData = cipher.doFinal(eventData);
- }
- final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
- gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
- boolean interrupted = false;
- int count = 0;
- do {
- try {
- future.get();
- } catch (final InterruptedException ie) {
- interrupted = true;
- ++count;
- }
- } while (interrupted && count <= 1);
-
- } catch (final Exception ex) {
- throw new LoggingException("Exception occurred writing log event", ex);
- }
- }
-
- @Override
- protected void releaseSub() {
- LOGGER.debug("Shutting down FlumePersistentManager");
- worker.shutdown();
- try {
- worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
- } catch (final InterruptedException ie) {
- // Ignore the exception and shutdown.
- }
- threadPool.shutdown();
- try {
- threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
- } catch (final InterruptedException ie) {
- LOGGER.warn("PersistentManager Thread pool failed to shut down");
- }
- try {
- worker.join();
- } catch (final InterruptedException ex) {
- LOGGER.debug("Interrupted while waiting for worker to complete");
- }
- try {
- LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
- database.close();
- } catch (final Exception ex) {
- LOGGER.warn("Failed to close database", ex);
- }
- try {
- environment.cleanLog();
- environment.close();
- } catch (final Exception ex) {
- LOGGER.warn("Failed to close environment", ex);
- }
- super.releaseSub();
- }
-
- private void doSend(final SimpleEvent event) {
- LOGGER.debug("Sending event to Flume");
- super.send(event);
- }
-
- /**
- * Thread for writing to Berkeley DB to avoid having interrupts close the database.
- */
- private static class BDBWriter implements Callable<Integer> {
- private final byte[] eventData;
- private final byte[] keyData;
- private final Environment environment;
- private final Database database;
- private final Gate gate;
- private final AtomicLong dbCount;
- private final long batchSize;
- private final int lockTimeoutRetryCount;
-
- public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
- final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
- final int lockTimeoutRetryCount) {
- this.keyData = keyData;
- this.eventData = eventData;
- this.environment = environment;
- this.database = database;
- this.gate = gate;
- this.dbCount = dbCount;
- this.batchSize = batchSize;
- this.lockTimeoutRetryCount = lockTimeoutRetryCount;
- }
-
- @Override
- public Integer call() throws Exception {
- final DatabaseEntry key = new DatabaseEntry(keyData);
- final DatabaseEntry data = new DatabaseEntry(eventData);
- Exception exception = null;
- for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
- Transaction txn = null;
- try {
- txn = environment.beginTransaction(null, null);
- try {
- database.put(txn, key, data);
- txn.commit();
- txn = null;
- if (dbCount.incrementAndGet() >= batchSize) {
- gate.open();
- }
- exception = null;
- break;
- } catch (final LockConflictException lce) {
- exception = lce;
- // Fall through and retry.
- } catch (final Exception ex) {
- if (txn != null) {
- txn.abort();
- }
- throw ex;
- } finally {
- if (txn != null) {
- txn.abort();
- txn = null;
- }
- }
- } catch (final LockConflictException lce) {
- exception = lce;
- if (txn != null) {
- try {
- txn.abort();
- txn = null;
- } catch (final Exception ex) {
- LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
- }
- }
-
- }
- try {
- Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
- } catch (final InterruptedException ie) {
- // Ignore the error
- }
- }
- if (exception != null) {
- throw exception;
- }
- return eventData.length;
- }
- }
-
- /**
- * Factory data.
- */
- private static class FactoryData {
- private final String name;
- private final Agent[] agents;
- private final int batchSize;
- private final String dataDir;
- private final int retries;
- private final int connectionTimeout;
- private final int requestTimeout;
- private final int delayMillis;
- private final int lockTimeoutRetryCount;
- private final Property[] properties;
-
- /**
- * Constructor.
- * @param name The name of the Appender.
- * @param agents The agents.
- * @param batchSize The number of events to include in a batch.
- * @param dataDir The directory for data.
- */
- public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
- final int connectionTimeout, final int requestTimeout, final int delayMillis,
- final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
- this.name = name;
- this.agents = agents;
- this.batchSize = batchSize;
- this.dataDir = dataDir;
- this.retries = retries;
- this.connectionTimeout = connectionTimeout;
- this.requestTimeout = requestTimeout;
- this.delayMillis = delayMillis;
- this.lockTimeoutRetryCount = lockTimeoutRetryCount;
- this.properties = properties;
- }
- }
-
- /**
- * Avro Manager Factory.
- */
- private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
-
- /**
- * Create the FlumeKratiManager.
- * @param name The name of the entity to manage.
- * @param data The data required to create the entity.
- * @return The FlumeKratiManager.
- */
- @Override
- public FlumePersistentManager createManager(final String name, final FactoryData data) {
- SecretKey secretKey = null;
- Database database = null;
- Environment environment = null;
-
- final Map<String, String> properties = new HashMap<>();
- if (data.properties != null) {
- for (final Property property : data.properties) {
- properties.put(property.getName(), property.getValue());
- }
- }
-
- try {
- final File dir = new File(data.dataDir);
- FileUtils.mkdir(dir, true);
- final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
- dbEnvConfig.setTransactional(true);
- dbEnvConfig.setAllowCreate(true);
- dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
- environment = new Environment(dir, dbEnvConfig);
- final DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setTransactional(true);
- dbConfig.setAllowCreate(true);
- database = environment.openDatabase(null, name, dbConfig);
- } catch (final Exception ex) {
- LOGGER.error("Could not create FlumePersistentManager", ex);
- // For consistency, close database as well as environment even though it should never happen since the
- // database is that last thing in the block above, but this does guard against a future line being
- // inserted at the end that would bomb (like some debug logging).
- if (database != null) {
- database.close();
- database = null;
- }
- if (environment != null) {
- environment.close();
- environment = null;
- }
- return null;
- }
-
- try {
- String key = null;
- for (final Map.Entry<String, String> entry : properties.entrySet()) {
- if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
- key = entry.getValue();
- break;
- }
- }
- if (key != null) {
- final PluginManager manager = new PluginManager("KeyProvider");
- manager.collectPlugins();
- final Map<String, PluginType<?>> plugins = manager.getPlugins();
- if (plugins != null) {
- boolean found = false;
- for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
- if (entry.getKey().equalsIgnoreCase(key)) {
- found = true;
- final Class<?> cl = entry.getValue().getPluginClass();
- try {
- final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
- secretKey = provider.getSecretKey();
- LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
- } catch (final Exception ex) {
- LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
- cl.getName());
- }
- break;
- }
- }
- if (!found) {
- LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
- }
- } else {
- LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
- }
- }
- } catch (final Exception ex) {
- LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
- }
- return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
- data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
- data.lockTimeoutRetryCount);
- }
- }
-
- /**
- * Thread that sends data to Flume and pulls it from Berkeley DB.
- */
- private static class WriterThread extends Thread {
- private volatile boolean shutdown = false;
- private final Database database;
- private final Environment environment;
- private final FlumePersistentManager manager;
- private final Gate gate;
- private final SecretKey secretKey;
- private final int batchSize;
- private final AtomicLong dbCounter;
- private final int lockTimeoutRetryCount;
-
- public WriterThread(final Database database, final Environment environment,
- final FlumePersistentManager manager, final Gate gate, final int batchsize,
- final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
- this.database = database;
- this.environment = environment;
- this.manager = manager;
- this.gate = gate;
- this.batchSize = batchsize;
- this.secretKey = secretKey;
- this.setDaemon(true);
- this.dbCounter = dbCount;
- this.lockTimeoutRetryCount = lockTimeoutRetryCount;
- }
-
- public void shutdown() {
- LOGGER.debug("Writer thread shutting down");
- this.shutdown = true;
- gate.open();
- }
-
- public boolean isShutdown() {
- return shutdown;
- }
-
- @Override
- public void run() {
- LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
- long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
- while (!shutdown) {
- final long nowMillis = System.currentTimeMillis();
- final long dbCount = database.count();
- dbCounter.set(dbCount);
- if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
- nextBatchMillis = nowMillis + manager.getDelayMillis();
- try {
- boolean errors = false;
- final DatabaseEntry key = new DatabaseEntry();
- final DatabaseEntry data = new DatabaseEntry();
-
- gate.close();
- OperationStatus status;
- if (batchSize > 1) {
- try {
- errors = sendBatch(key, data);
- } catch (final Exception ex) {
- break;
- }
- } else {
- Exception exception = null;
- for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
- exception = null;
- Transaction txn = null;
- Cursor cursor = null;
- try {
- txn = environment.beginTransaction(null, null);
- cursor = database.openCursor(txn, null);
- try {
- status = cursor.getFirst(key, data, LockMode.RMW);
- while (status == OperationStatus.SUCCESS) {
- final SimpleEvent event = createEvent(data);
- if (event != null) {
- try {
- manager.doSend(event);
- } catch (final Exception ioe) {
- errors = true;
- LOGGER.error("Error sending event", ioe);
- break;
- }
- try {
- cursor.delete();
- } catch (final Exception ex) {
- LOGGER.error("Unable to delete event", ex);
- }
- }
- status = cursor.getNext(key, data, LockMode.RMW);
- }
- if (cursor != null) {
- cursor.close();
- cursor = null;
- }
- txn.commit();
- txn = null;
- dbCounter.decrementAndGet();
- exception = null;
- break;
- } catch (final LockConflictException lce) {
- exception = lce;
- // Fall through and retry.
- } catch (final Exception ex) {
- LOGGER.error("Error reading or writing to database", ex);
- shutdown = true;
- break;
- } finally {
- if (cursor != null) {
- cursor.close();
- cursor = null;
- }
- if (txn != null) {
- txn.abort();
- txn = null;
- }
- }
- } catch (final LockConflictException lce) {
- exception = lce;
- if (cursor != null) {
- try {
- cursor.close();
- cursor = null;
- } catch (final Exception ex) {
- LOGGER.trace("Ignored exception closing cursor during lock conflict.");
- }
- }
- if (txn != null) {
- try {
- txn.abort();
- txn = null;
- } catch (final Exception ex) {
- LOGGER.trace("Ignored exception aborting tx during lock conflict.");
- }
- }
- }
- try {
- Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
- } catch (final InterruptedException ie) {
- // Ignore the error
- }
- }
- if (exception != null) {
- LOGGER.error("Unable to read or update data base", exception);
- }
- }
- if (errors) {
- Thread.sleep(manager.getDelayMillis());
- continue;
- }
- } catch (final Exception ex) {
- LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
- }
- } else {
- if (nextBatchMillis <= nowMillis) {
- nextBatchMillis = nowMillis + manager.getDelayMillis();
- }
- try {
- final long interval = nextBatchMillis - nowMillis;
- gate.waitForOpen(interval);
- } catch (final InterruptedException ie) {
- LOGGER.warn("WriterThread interrupted, continuing");
- } catch (final Exception ex) {
- LOGGER.error("WriterThread encountered an exception waiting for work", ex);
- break;
- }
- }
- }
-
- if (batchSize > 1 && database.count() > 0) {
- final DatabaseEntry key = new DatabaseEntry();
- final DatabaseEntry data = new DatabaseEntry();
- try {
- sendBatch(key, data);
- } catch (final Exception ex) {
- LOGGER.warn("Unable to write final batch");
- }
- }
- LOGGER.trace("WriterThread exiting");
- }
-
- private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
- boolean errors = false;
- OperationStatus status;
- Cursor cursor = null;
- try {
- final BatchEvent batch = new BatchEvent();
- for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
- try {
- cursor = database.openCursor(null, CursorConfig.DEFAULT);
- status = cursor.getFirst(key, data, null);
-
- for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
- final SimpleEvent event = createEvent(data);
- if (event != null) {
- batch.addEvent(event);
- }
- status = cursor.getNext(key, data, null);
- }
- break;
- } catch (final LockConflictException lce) {
- if (cursor != null) {
- try {
- cursor.close();
- cursor = null;
- } catch (final Exception ex) {
- LOGGER.trace("Ignored exception closing cursor during lock conflict.");
- }
- }
- }
- }
-
- try {
- manager.send(batch);
- } catch (final Exception ioe) {
- LOGGER.error("Error sending events", ioe);
- errors = true;
- }
- if (!errors) {
- if (cursor != null) {
- cursor.close();
- cursor = null;
- }
- Transaction txn = null;
- Exception exception = null;
- for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
- try {
- txn = environment.beginTransaction(null, null);
- try {
- for (final Event event : batch.getEvents()) {
- try {
- final Map<String, String> headers = event.getHeaders();
- key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
- database.delete(txn, key);
- } catch (final Exception ex) {
- LOGGER.error("Error deleting key from database", ex);
- }
- }
- txn.commit();
- long count = dbCounter.get();
- while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
- count = dbCounter.get();
- }
- exception = null;
- break;
- } catch (final LockConflictException lce) {
- exception = lce;
- if (cursor != null) {
- try {
- cursor.close();
- cursor = null;
- } catch (final Exception ex) {
- LOGGER.trace("Ignored exception closing cursor during lock conflict.");
- }
- }
- if (txn != null) {
- try {
- txn.abort();
- txn = null;
- } catch (final Exception ex) {
- LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
- }
- }
- } catch (final Exception ex) {
- LOGGER.error("Unable to commit transaction", ex);
- if (txn != null) {
- txn.abort();
- }
- }
- } catch (final LockConflictException lce) {
- exception = lce;
- if (cursor != null) {
- try {
- cursor.close();
- cursor = null;
- } catch (final Exception ex) {
- LOGGER.trace("Ignored exception closing cursor during lock conflict.");
- }
- }
- if (txn != null) {
- try {
- txn.abort();
- txn = null;
- } catch (final Exception ex) {
- LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
- }
- }
- } finally {
- if (cursor != null) {
- cursor.close();
- cursor = null;
- }
- if (txn != null) {
- txn.abort();
- txn = null;
- }
- }
- try {
- Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
- } catch (final InterruptedException ie) {
- // Ignore the error
- }
- }
- if (exception != null) {
- LOGGER.error("Unable to delete events from data base", exception);
- }
- }
- } catch (final Exception ex) {
- LOGGER.error("Error reading database", ex);
- shutdown = true;
- throw ex;
- } finally {
- if (cursor != null) {
- cursor.close();
- }
- }
-
- return errors;
- }
-
- private SimpleEvent createEvent(final DatabaseEntry data) {
- final SimpleEvent event = new SimpleEvent();
- try {
- byte[] eventData = data.getData();
- if (secretKey != null) {
- final Cipher cipher = Cipher.getInstance("AES");
- cipher.init(Cipher.DECRYPT_MODE, secretKey);
- eventData = cipher.doFinal(eventData);
- }
- final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
- final DataInputStream dais = new DataInputStream(bais);
- int length = dais.readInt();
- final byte[] bytes = new byte[length];
- dais.read(bytes, 0, length);
- event.setBody(bytes);
- length = dais.readInt();
- final Map<String, String> map = new HashMap<>(length);
- for (int i = 0; i < length; ++i) {
- final String headerKey = dais.readUTF();
- final String value = dais.readUTF();
- map.put(headerKey, value);
- }
- event.setHeaders(map);
- return event;
- } catch (final Exception ex) {
- LOGGER.error("Error retrieving event", ex);
- return null;
- }
- }
-
- }
-
- /**
- * Factory that creates Daemon threads that can be properly shut down.
- */
- private static class DaemonThreadFactory implements ThreadFactory {
- private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
- private final ThreadGroup group;
- private final AtomicInteger threadNumber = new AtomicInteger(1);
- private final String namePrefix;
-
- public DaemonThreadFactory() {
- final SecurityManager securityManager = System.getSecurityManager();
- group = securityManager != null ? securityManager.getThreadGroup() :
- Thread.currentThread().getThreadGroup();
- namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
- }
-
- @Override
- public Thread newThread(final Runnable r) {
- final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
- thread.setDaemon(true);
- if (thread.getPriority() != Thread.NORM_PRIORITY) {
- thread.setPriority(Thread.NORM_PRIORITY);
- }
- return thread;
- }
- }
-
- /**
- * An internal class.
- */
- private static class Gate {
-
- private boolean isOpen = false;
-
- public boolean isOpen() {
- return isOpen;
- }
-
- public synchronized void open() {
- isOpen = true;
- notifyAll();
- }
-
- public synchronized void close() {
- isOpen = false;
- }
-
- public synchronized void waitForOpen(final long timeout) throws InterruptedException {
- wait(timeout);
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.crypto.Cipher;
+import javax.crypto.SecretKey;
+
+import org.apache.flume.Event;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.logging.log4j.LoggingException;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
+import org.apache.logging.log4j.core.config.plugins.util.PluginType;
+import org.apache.logging.log4j.core.util.FileUtils;
+import org.apache.logging.log4j.core.util.SecretKeyProvider;
+import org.apache.logging.log4j.util.Strings;
+
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.CursorConfig;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockConflictException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.StatsConfig;
+import com.sleepycat.je.Transaction;
+
+/**
+ * Manager that persists data to Berkeley DB before passing it on to Flume.
+ */
+public class FlumePersistentManager extends FlumeAvroManager {
+
+ /** Attribute name for the key provider. */
+ public static final String KEY_PROVIDER = "keyProvider";
+
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+
+ private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
+
+ private static final int SHUTDOWN_WAIT = 60;
+
+ private static final int MILLIS_PER_SECOND = 1000;
+
+ private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
+
+ private static BDBManagerFactory factory = new BDBManagerFactory();
+
+ private final Database database;
+
+ private final Environment environment;
+
+ private final WriterThread worker;
+
+ private final Gate gate = new Gate();
+
+ private final SecretKey secretKey;
+
+ private final int lockTimeoutRetryCount;
+
+ private final ExecutorService threadPool;
+
+ private final AtomicLong dbCount = new AtomicLong();
+
+ /**
+ * Constructor
+ * @param name The unique name of this manager.
+ * @param shortName Original name for the Manager.
+ * @param agents An array of Agents.
+ * @param batchSize The number of events to include in a batch.
+ * @param retries The number of times to retry connecting before giving up.
+ * @param connectionTimeout The amount of time to wait for a connection to be established.
+ * @param requestTimeout The amount of time to wair for a response to a request.
+ * @param delay The amount of time to wait between retries.
+ * @param database The database to write to.
+ * @param environment The database environment.
+ * @param secretKey The SecretKey to use for encryption.
+ * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
+ */
+ protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
+ final int batchSize, final int retries, final int connectionTimeout,
+ final int requestTimeout, final int delay, final Database database,
+ final Environment environment, final SecretKey secretKey,
+ final int lockTimeoutRetryCount) {
+ super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
+ this.database = database;
+ this.environment = environment;
+ dbCount.set(database.count());
+ this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
+ lockTimeoutRetryCount);
+ this.worker.start();
+ this.secretKey = secretKey;
+ this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
+ this.lockTimeoutRetryCount = lockTimeoutRetryCount;
+ }
+
+
+ /**
+ * Returns a FlumeAvroManager.
+ * @param name The name of the manager.
+ * @param agents The agents to use.
+ * @param properties Properties to pass to the Manager.
+ * @param batchSize The number of events to include in a batch.
+ * @param retries The number of times to retry connecting before giving up.
+ * @param connectionTimeout The amount of time to wait to establish a connection.
+ * @param requestTimeout The amount of time to wait for a response to a request.
+ * @param delayMillis Amount of time to delay before delivering a batch.
+ * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
+ * @param dataDir The location of the Berkeley database.
+ * @return A FlumeAvroManager.
+ */
+ public static FlumePersistentManager getManager(final String name, final Agent[] agents,
+ final Property[] properties, int batchSize, final int retries,
+ final int connectionTimeout, final int requestTimeout,
+ final int delayMillis, final int lockTimeoutRetryCount,
+ final String dataDir) {
+ if (agents == null || agents.length == 0) {
+ throw new IllegalArgumentException("At least one agent is required");
+ }
+
+ if (batchSize <= 0) {
+ batchSize = 1;
+ }
+ final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
+
+ final StringBuilder sb = new StringBuilder("FlumePersistent[");
+ boolean first = true;
+ for (final Agent agent : agents) {
+ if (!first) {
+ sb.append(',');
+ }
+ sb.append(agent.getHost()).append(':').append(agent.getPort());
+ first = false;
+ }
+ sb.append(']');
+ sb.append(' ').append(dataDirectory);
+ return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
+ connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
+ }
+
+ @Override
+ public void send(final Event event) {
+ if (worker.isShutdown()) {
+ throw new LoggingException("Unable to record event");
+ }
+
+ final Map<String, String> headers = event.getHeaders();
+ final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
+ try {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream daos = new DataOutputStream(baos);
+ daos.writeInt(event.getBody().length);
+ daos.write(event.getBody(), 0, event.getBody().length);
+ daos.writeInt(event.getHeaders().size());
+ for (final Map.Entry<String, String> entry : headers.entrySet()) {
+ daos.writeUTF(entry.getKey());
+ daos.writeUTF(entry.getValue());
+ }
+ byte[] eventData = baos.toByteArray();
+ if (secretKey != null) {
+ final Cipher cipher = Cipher.getInstance("AES");
+ cipher.init(Cipher.ENCRYPT_MODE, secretKey);
+ eventData = cipher.doFinal(eventData);
+ }
+ final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
+ gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
+ boolean interrupted = false;
+ int ieCount = 0;
+ do {
+ try {
+ future.get();
+ } catch (final InterruptedException ie) {
+ interrupted = true;
+ ++ieCount;
+ }
+ } while (interrupted && ieCount <= 1);
+
+ } catch (final Exception ex) {
+ throw new LoggingException("Exception occurred writing log event", ex);
+ }
+ }
+
+ @Override
+ protected void releaseSub() {
+ LOGGER.debug("Shutting down FlumePersistentManager");
+ worker.shutdown();
+ try {
+ worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
+ } catch (final InterruptedException ie) {
+ // Ignore the exception and shutdown.
+ }
+ threadPool.shutdown();
+ try {
+ threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
+ } catch (final InterruptedException ie) {
+ LOGGER.warn("PersistentManager Thread pool failed to shut down");
+ }
+ try {
+ worker.join();
+ } catch (final InterruptedException ex) {
+ LOGGER.debug("Interrupted while waiting for worker to complete");
+ }
+ try {
+ LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
+ database.close();
+ } catch (final Exception ex) {
+ LOGGER.warn("Failed to close database", ex);
+ }
+ try {
+ environment.cleanLog();
+ environment.close();
+ } catch (final Exception ex) {
+ LOGGER.warn("Failed to close environment", ex);
+ }
+ super.releaseSub();
+ }
+
+ private void doSend(final SimpleEvent event) {
+ LOGGER.debug("Sending event to Flume");
+ super.send(event);
+ }
+
+ /**
+ * Thread for writing to Berkeley DB to avoid having interrupts close the database.
+ */
+ private static class BDBWriter implements Callable<Integer> {
+ private final byte[] eventData;
+ private final byte[] keyData;
+ private final Environment environment;
+ private final Database database;
+ private final Gate gate;
+ private final AtomicLong dbCount;
+ private final long batchSize;
+ private final int lockTimeoutRetryCount;
+
+ public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
+ final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
+ final int lockTimeoutRetryCount) {
+ this.keyData = keyData;
+ this.eventData = eventData;
+ this.environment = environment;
+ this.database = database;
+ this.gate = gate;
+ this.dbCount = dbCount;
+ this.batchSize = batchSize;
+ this.lockTimeoutRetryCount = lockTimeoutRetryCount;
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ final DatabaseEntry key = new DatabaseEntry(keyData);
+ final DatabaseEntry data = new DatabaseEntry(eventData);
+ Exception exception = null;
+ for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
+ Transaction txn = null;
+ try {
+ txn = environment.beginTransaction(null, null);
+ try {
+ database.put(txn, key, data);
+ txn.commit();
+ txn = null;
+ if (dbCount.incrementAndGet() >= batchSize) {
+ gate.open();
+ }
+ exception = null;
+ break;
+ } catch (final LockConflictException lce) {
+ exception = lce;
+ // Fall through and retry.
+ } catch (final Exception ex) {
+ if (txn != null) {
+ txn.abort();
+ }
+ throw ex;
+ } finally {
+ if (txn != null) {
+ txn.abort();
+ txn = null;
+ }
+ }
+ } catch (final LockConflictException lce) {
+ exception = lce;
+ if (txn != null) {
+ try {
+ txn.abort();
+ txn = null;
+ } catch (final Exception ex) {
+ LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
+ }
+ }
+
+ }
+ try {
+ Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+ } catch (final InterruptedException ie) {
+ // Ignore the error
+ }
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ return eventData.length;
+ }
+ }
+
+ /**
+ * Factory data.
+ */
+ private static class FactoryData {
+ private final String name;
+ private final Agent[] agents;
+ private final int batchSize;
+ private final String dataDir;
+ private final int retries;
+ private final int connectionTimeout;
+ private final int requestTimeout;
+ private final int delayMillis;
+ private final int lockTimeoutRetryCount;
+ private final Property[] properties;
+
+ /**
+ * Constructor.
+ * @param name The name of the Appender.
+ * @param agents The agents.
+ * @param batchSize The number of events to include in a batch.
+ * @param dataDir The directory for data.
+ */
+ public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
+ final int connectionTimeout, final int requestTimeout, final int delayMillis,
+ final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
+ this.name = name;
+ this.agents = agents;
+ this.batchSize = batchSize;
+ this.dataDir = dataDir;
+ this.retries = retries;
+ this.connectionTimeout = connectionTimeout;
+ this.requestTimeout = requestTimeout;
+ this.delayMillis = delayMillis;
+ this.lockTimeoutRetryCount = lockTimeoutRetryCount;
+ this.properties = properties;
+ }
+ }
+
+ /**
+ * Avro Manager Factory.
+ */
+ private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
+
+ /**
+ * Create the FlumeKratiManager.
+ * @param name The name of the entity to manage.
+ * @param data The data required to create the entity.
+ * @return The FlumeKratiManager.
+ */
+ @Override
+ public FlumePersistentManager createManager(final String name, final FactoryData data) {
+ SecretKey secretKey = null;
+ Database database = null;
+ Environment environment = null;
+
+ final Map<String, String> properties = new HashMap<>();
+ if (data.properties != null) {
+ for (final Property property : data.properties) {
+ properties.put(property.getName(), property.getValue());
+ }
+ }
+
+ try {
+ final File dir = new File(data.dataDir);
+ FileUtils.mkdir(dir, true);
+ final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
+ dbEnvConfig.setTransactional(true);
+ dbEnvConfig.setAllowCreate(true);
+ dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
+ environment = new Environment(dir, dbEnvConfig);
+ final DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+ database = environment.openDatabase(null, name, dbConfig);
+ } catch (final Exception ex) {
+ LOGGER.error("Could not create FlumePersistentManager", ex);
+ // For consistency, close database as well as environment even though it should never happen since the
+ // database is that last thing in the block above, but this does guard against a future line being
+ // inserted at the end that would bomb (like some debug logging).
+ if (database != null) {
+ database.close();
+ database = null;
+ }
+ if (environment != null) {
+ environment.close();
+ environment = null;
+ }
+ return null;
+ }
+
+ try {
+ String key = null;
+ for (final Map.Entry<String, String> entry : properties.entrySet()) {
+ if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
+ key = entry.getValue();
+ break;
+ }
+ }
+ if (key != null) {
+ final PluginManager manager = new PluginManager("KeyProvider");
+ manager.collectPlugins();
+ final Map<String, PluginType<?>> plugins = manager.getPlugins();
+ if (plugins != null) {
+ boolean found = false;
+ for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
+ if (entry.getKey().equalsIgnoreCase(key)) {
+ found = true;
+ final Class<?> cl = entry.getValue().getPluginClass();
+ try {
+ final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
+ secretKey = provider.getSecretKey();
+ LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
+ } catch (final Exception ex) {
+ LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
+ cl.getName());
+ }
+ break;
+ }
+ }
+ if (!found) {
+ LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
+ }
+ } else {
+ LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
+ }
+ }
+ } catch (final Exception ex) {
+ LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
+ }
+ return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
+ data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
+ data.lockTimeoutRetryCount);
+ }
+ }
+
+ /**
+ * Thread that sends data to Flume and pulls it from Berkeley DB.
+ */
+ private static class WriterThread extends Thread {
+ private volatile boolean shutdown = false;
+ private final Database database;
+ private final Environment environment;
+ private final FlumePersistentManager manager;
+ private final Gate gate;
+ private final SecretKey secretKey;
+ private final int batchSize;
+ private final AtomicLong dbCounter;
+ private final int lockTimeoutRetryCount;
+
+ public WriterThread(final Database database, final Environment environment,
+ final FlumePersistentManager manager, final Gate gate, final int batchsize,
+ final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
+ this.database = database;
+ this.environment = environment;
+ this.manager = manager;
+ this.gate = gate;
+ this.batchSize = batchsize;
+ this.secretKey = secretKey;
+ this.setDaemon(true);
+ this.dbCounter = dbCount;
+ this.lockTimeoutRetryCount = lockTimeoutRetryCount;
+ }
+
+ public void shutdown() {
+ LOGGER.debug("Writer thread shutting down");
+ this.shutdown = true;
+ gate.open();
+ }
+
+ public boolean isShutdown() {
+ return shutdown;
+ }
+
+ @Override
+ public void run() {
+ LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
+ long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
+ while (!shutdown) {
+ final long nowMillis = System.currentTimeMillis();
+ final long dbCount = database.count();
+ dbCounter.set(dbCount);
+ if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
+ nextBatchMillis = nowMillis + manager.getDelayMillis();
+ try {
+ boolean errors = false;
+ final DatabaseEntry key = new DatabaseEntry();
+ final DatabaseEntry data = new DatabaseEntry();
+
+ gate.close();
+ OperationStatus status;
+ if (batchSize > 1) {
+ try {
+ errors = sendBatch(key, data);
+ } catch (final Exception ex) {
+ break;
+ }
+ } else {
+ Exception exception = null;
+ for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
+ exception = null;
+ Transaction txn = null;
+ Cursor cursor = null;
+ try {
+ txn = environment.beginTransaction(null, null);
+ cursor = database.openCursor(txn, null);
+ try {
+ status = cursor.getFirst(key, data, LockMode.RMW);
+ while (status == OperationStatus.SUCCESS) {
+ final SimpleEvent event = createEvent(data);
+ if (event != null) {
+ try {
+ manager.doSend(event);
+ } catch (final Exception ioe) {
+ errors = true;
+ LOGGER.error("Error sending event", ioe);
+ break;
+ }
+ try {
+ cursor.delete();
+ } catch (final Exception ex) {
+ LOGGER.error("Unable to delete event", ex);
+ }
+ }
+ status = cursor.getNext(key, data, LockMode.RMW);
+ }
+ if (cursor != null) {
+ cursor.close();
+ cursor = null;
+ }
+ txn.commit();
+ txn = null;
+ dbCounter.decrementAndGet();
+ exception = null;
+ break;
+ } catch (final LockConflictException lce) {
+ exception = lce;
+ // Fall through and retry.
+ } catch (final Exception ex) {
+ LOGGER.error("Error reading or writing to database", ex);
+ shutdown = true;
+ break;
+ } finally {
+ if (cursor != null) {
+ cursor.close();
+ cursor = null;
+ }
+ if (txn != null) {
+ txn.abort();
+ txn = null;
+ }
+ }
+ } catch (final LockConflictException lce) {
+ exception = lce;
+ if (cursor != null) {
+ try {
+ cursor.close();
+ cursor = null;
+ } catch (final Exception ex) {
+ LOGGER.trace("Ignored exception closing cursor during lock conflict.");
+ }
+ }
+ if (txn != null) {
+ try {
+ txn.abort();
+ txn = null;
+ } catch (final Exception ex) {
+ LOGGER.trace("Ignored exception aborting tx during lock conflict.");
+ }
+ }
+ }
+ try {
+ Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+ } catch (final InterruptedException ie) {
+ // Ignore the error
+ }
+ }
+ if (exception != null) {
+ LOGGER.error("Unable to read or update data base", exception);
+ }
+ }
+ if (errors) {
+ Thread.sleep(manager.getDelayMillis());
+ continue;
+ }
+ } catch (final Exception ex) {
+ LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
+ }
+ } else {
+ if (nextBatchMillis <= nowMillis) {
+ nextBatchMillis = nowMillis + manager.getDelayMillis();
+ }
+ try {
+ final long interval = nextBatchMillis - nowMillis;
+ gate.waitForOpen(interval);
+ } catch (final InterruptedException ie) {
+ LOGGER.warn("WriterThread interrupted, continuing");
+ } catch (final Exception ex) {
+ LOGGER.error("WriterThread encountered an exception waiting for work", ex);
+ break;
+ }
+ }
+ }
+
+ if (batchSize > 1 && database.count() > 0) {
+ final DatabaseEntry key = new DatabaseEntry();
+ final DatabaseEntry data = new DatabaseEntry();
+ try {
+ sendBatch(key, data);
+ } catch (final Exception ex) {
+ LOGGER.warn("Unable to write final batch");
+ }
+ }
+ LOGGER.trace("WriterThread exiting");
+ }
+
+ private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
+ boolean errors = false;
+ OperationStatus status;
+ Cursor cursor = null;
+ try {
+ final BatchEvent batch = new BatchEvent();
+ for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
+ try {
+ cursor = database.openCursor(null, CursorConfig.DEFAULT);
+ status = cursor.getFirst(key, data, null);
+
+ for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
+ final SimpleEvent event = createEvent(data);
+ if (event != null) {
+ batch.addEvent(event);
+ }
+ status = cursor.getNext(key, data, null);
+ }
+ break;
+ } catch (final LockConflictException lce) {
+ if (cursor != null) {
+ try {
+ cursor.close();
+ cursor = null;
+ } catch (final Exception ex) {
+ LOGGER.trace("Ignored exception closing cursor during lock conflict.");
+ }
+ }
+ }
+ }
+
+ try {
+ manager.send(batch);
+ } catch (final Exception ioe) {
+ LOGGER.error("Error sending events", ioe);
+ errors = true;
+ }
+ if (!errors) {
+ if (cursor != null) {
+ cursor.close();
+ cursor = null;
+ }
+ Transaction txn = null;
+ Exception exception = null;
+ for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
+ try {
+ txn = environment.beginTransaction(null, null);
+ try {
+ for (final Event event : batch.getEvents()) {
+ try {
+ final Map<String, String> headers = event.getHeaders();
+ key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
+ database.delete(txn, key);
+ } catch (final Exception ex) {
+ LOGGER.error("Error deleting key from database", ex);
+ }
+ }
+ txn.commit();
+ long count = dbCounter.get();
+ while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
+ count = dbCounter.get();
+ }
+ exception = null;
+ break;
+ } catch (final LockConflictException lce) {
+ exception = lce;
+ if (cursor != null) {
+ try {
+ cursor.close();
+ cursor = null;
+ } catch (final Exception ex) {
+ LOGGER.trace("Ignored exception closing cursor during lock conflict.");
+ }
+ }
+ if (txn != null) {
+ try {
+ txn.abort();
+ txn = null;
+ } catch (final Exception ex) {
+ LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
+ }
+ }
+ } catch (final Exception ex) {
+ LOGGER.error("Unable to commit transaction", ex);
+ if (txn != null) {
+ txn.abort();
+ }
+ }
+ } catch (final LockConflictException lce) {
+ exception = lce;
+ if (cursor != null) {
+ try {
+ cursor.close();
+ cursor = null;
+ } catch (final Exception ex) {
+ LOGGER.trace("Ignored exception closing cursor during lock conflict.");
+ }
+ }
+ if (txn != null) {
+ try {
+ txn.abort();
+ txn = null;
+ } catch (final Exception ex) {
+ LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
+ }
+ }
+ } finally {
+ if (cursor != null) {
+ cursor.close();
+ cursor = null;
+ }
+ if (txn != null) {
+ txn.abort();
+ txn = null;
+ }
+ }
+ try {
+ Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
+ } catch (final InterruptedException ie) {
+ // Ignore the error
+ }
+ }
+ if (exception != null) {
+ LOGGER.error("Unable to delete events from data base", exception);
+ }
+ }
+ } catch (final Exception ex) {
+ LOGGER.error("Error reading database", ex);
+ shutdown = true;
+ throw ex;
+ } finally {
+ if (cursor != null) {
+ cursor.close();
+ }
+ }
+
+ return errors;
+ }
+
+ private SimpleEvent createEvent(final DatabaseEntry data) {
+ final SimpleEvent event = new SimpleEvent();
+ try {
+ byte[] eventData = data.getData();
+ if (secretKey != null) {
+ final Cipher cipher = Cipher.getInstance("AES");
+ cipher.init(Cipher.DECRYPT_MODE, secretKey);
+ eventData = cipher.doFinal(eventData);
+ }
+ final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
+ final DataInputStream dais = new DataInputStream(bais);
+ int length = dais.readInt();
+ final byte[] bytes = new byte[length];
+ dais.read(bytes, 0, length);
+ event.setBody(bytes);
+ length = dais.readInt();
+ final Map<String, String> map = new HashMap<>(length);
+ for (int i = 0; i < length; ++i) {
+ final String headerKey = dais.readUTF();
+ final String value = dais.readUTF();
+ map.put(headerKey, value);
+ }
+ event.setHeaders(map);
+ return event;
+ } catch (final Exception ex) {
+ LOGGER.error("Error retrieving event", ex);
+ return null;
+ }
+ }
+
+ }
+
+ /**
+ * Factory that creates Daemon threads that can be properly shut down.
+ */
+ private static class DaemonThreadFactory implements ThreadFactory {
+ private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
+ private final ThreadGroup group;
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final String namePrefix;
+
+ public DaemonThreadFactory() {
+ final SecurityManager securityManager = System.getSecurityManager();
+ group = securityManager != null ? securityManager.getThreadGroup() :
+ Thread.currentThread().getThreadGroup();
+ namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
+ }
+
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
+ thread.setDaemon(true);
+ if (thread.getPriority() != Thread.NORM_PRIORITY) {
+ thread.setPriority(Thread.NORM_PRIORITY);
+ }
+ return thread;
+ }
+ }
+
+ /**
+ * An internal class.
+ */
+ private static class Gate {
+
+ private boolean isOpen = false;
+
+ public boolean isOpen() {
+ return isOpen;
+ }
+
+ public synchronized void open() {
+ isOpen = true;
+ notifyAll();
+ }
+
+ public synchronized void close() {
+ isOpen = false;
+ }
+
+ public synchronized void waitForOpen(final long timeout) throws InterruptedException {
+ wait(timeout);
+ }
+ }
+}
[2/2] logging-log4j2 git commit: Checkstyle: do not hide field.
Posted by gg...@apache.org.
Checkstyle: do not hide field.
Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo
Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/a7f67f8b
Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/a7f67f8b
Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/a7f67f8b
Branch: refs/heads/master
Commit: a7f67f8bb989c36525904a2388043c0037489462
Parents: 5f8aadd
Author: ggregory <gg...@apache.org>
Authored: Mon Sep 14 14:08:10 2015 -0700
Committer: ggregory <gg...@apache.org>
Committed: Mon Sep 14 14:08:10 2015 -0700
----------------------------------------------------------------------
.../log4j/flume/appender/FlumeAvroManager.java | 664 +++++++++----------
1 file changed, 332 insertions(+), 332 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/a7f67f8b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
----------------------------------------------------------------------
diff --git a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
index d6e0d34..ef3234a 100644
--- a/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
+++ b/log4j-flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
@@ -1,332 +1,332 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache license, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the license for the specific language governing permissions and
- * limitations under the license.
- */
-package org.apache.logging.log4j.flume.appender;
-
-import java.util.Properties;
-
-import org.apache.flume.Event;
-import org.apache.flume.api.RpcClient;
-import org.apache.flume.api.RpcClientFactory;
-import org.apache.logging.log4j.core.appender.AppenderLoggingException;
-import org.apache.logging.log4j.core.appender.ManagerFactory;
-
-/**
- * Manager for FlumeAvroAppenders.
- */
-public class FlumeAvroManager extends AbstractFlumeManager {
-
- private static final int MAX_RECONNECTS = 3;
- private static final int MINIMUM_TIMEOUT = 1000;
-
- private static AvroManagerFactory factory = new AvroManagerFactory();
-
- private final Agent[] agents;
-
- private final int batchSize;
-
- private final long delayNanos;
- private final int delayMillis;
-
- private final int retries;
-
- private final int connectTimeoutMillis;
-
- private final int requestTimeoutMillis;
-
- private final int current = 0;
-
- private RpcClient rpcClient = null;
-
- private BatchEvent batchEvent = new BatchEvent();
- private long nextSend = 0;
-
- /**
- * Constructor
- * @param name The unique name of this manager.
- * @param agents An array of Agents.
- * @param batchSize The number of events to include in a batch.
- * @param retries The number of times to retry connecting before giving up.
- * @param connectTimeout The connection timeout in ms.
- * @param requestTimeout The request timeout in ms.
- *
- */
- protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
- final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
- super(name);
- this.agents = agents;
- this.batchSize = batchSize;
- this.delayMillis = delayMillis;
- this.delayNanos = delayMillis * 1000000;
- this.retries = retries;
- this.connectTimeoutMillis = connectTimeout;
- this.requestTimeoutMillis = requestTimeout;
- this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
- }
-
- /**
- * Returns a FlumeAvroManager.
- * @param name The name of the manager.
- * @param agents The agents to use.
- * @param batchSize The number of events to include in a batch.
- * @param delayMillis The number of milliseconds to wait before sending an incomplete batch.
- * @param retries The number of times to retry connecting before giving up.
- * @param connectTimeoutMillis The connection timeout in ms.
- * @param requestTimeoutMillis The request timeout in ms.
- * @return A FlumeAvroManager.
- */
- public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis,
- final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
- if (agents == null || agents.length == 0) {
- throw new IllegalArgumentException("At least one agent is required");
- }
-
- if (batchSize <= 0) {
- batchSize = 1;
- }
-
- final StringBuilder sb = new StringBuilder("FlumeAvro[");
- boolean first = true;
- for (final Agent agent : agents) {
- if (!first) {
- sb.append(',');
- }
- sb.append(agent.getHost()).append(':').append(agent.getPort());
- first = false;
- }
- sb.append(']');
- return getManager(sb.toString(), factory,
- new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
- }
-
- /**
- * Returns the agents.
- * @return The agent array.
- */
- public Agent[] getAgents() {
- return agents;
- }
-
- /**
- * Returns the index of the current agent.
- * @return The index for the current agent.
- */
- public int getCurrent() {
- return current;
- }
-
- public int getRetries() {
- return retries;
- }
-
- public int getConnectTimeoutMillis() {
- return connectTimeoutMillis;
- }
-
- public int getRequestTimeoutMillis() {
- return requestTimeoutMillis;
- }
-
- public int getBatchSize() {
- return batchSize;
- }
-
- public int getDelayMillis() {
- return delayMillis;
- }
-
- public synchronized void send(final BatchEvent events) {
- if (rpcClient == null) {
- rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
- }
-
- if (rpcClient != null) {
- try {
- LOGGER.trace("Sending batch of {} events", events.getEvents().size());
- rpcClient.appendBatch(events.getEvents());
- } catch (final Exception ex) {
- rpcClient.close();
- rpcClient = null;
- final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
- agents[current].getPort();
- LOGGER.warn(msg, ex);
- throw new AppenderLoggingException("No Flume agents are available");
- }
- } else {
- final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
- agents[current].getPort();
- LOGGER.warn(msg);
- throw new AppenderLoggingException("No Flume agents are available");
- }
- }
-
- @Override
- public synchronized void send(final Event event) {
- if (batchSize == 1) {
- if (rpcClient == null) {
- rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
- }
-
- if (rpcClient != null) {
- try {
- rpcClient.append(event);
- } catch (final Exception ex) {
- rpcClient.close();
- rpcClient = null;
- final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
- agents[current].getPort();
- LOGGER.warn(msg, ex);
- throw new AppenderLoggingException("No Flume agents are available");
- }
- } else {
- final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
- agents[current].getPort();
- LOGGER.warn(msg);
- throw new AppenderLoggingException("No Flume agents are available");
- }
- } else {
- batchEvent.addEvent(event);
- final int count = batchEvent.getEvents().size();
- if (count == 1) {
- nextSend = System.nanoTime() + delayNanos;
- }
- if (count >= batchSize || System.nanoTime() >= nextSend) {
- send(batchEvent);
- batchEvent = new BatchEvent();
- }
- }
- }
-
- /**
- * There is a very good chance that this will always return the first agent even if it isn't available.
- * @param agents The list of agents to choose from
- * @return The FlumeEventAvroServer.
- */
- private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
- try {
- final Properties props = new Properties();
-
- props.put("client.type", "default_failover");
-
- int count = 1;
- final StringBuilder sb = new StringBuilder();
- for (final Agent agent : agents) {
- if (sb.length() > 0) {
- sb.append(' ');
- }
- final String hostName = "host" + count++;
- props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
- sb.append(hostName);
- }
- props.put("hosts", sb.toString());
- if (batchSize > 0) {
- props.put("batch-size", Integer.toString(batchSize));
- }
- if (retries > 1) {
- if (retries > MAX_RECONNECTS) {
- retries = MAX_RECONNECTS;
- }
- props.put("max-attempts", Integer.toString(retries * agents.length));
- }
- if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
- props.put("request-timeout", Integer.toString(requestTimeoutMillis));
- }
- if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
- props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
- }
- return RpcClientFactory.getInstance(props);
- } catch (final Exception ex) {
- LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
- return null;
- }
- }
-
- @Override
- protected void releaseSub() {
- if (rpcClient != null) {
- try {
- synchronized(this) {
- try {
- if (batchSize > 1 && batchEvent.getEvents().size() > 0) {
- send(batchEvent);
- }
- } catch (final Exception ex) {
- LOGGER.error("Error sending final batch: {}", ex.getMessage());
- }
- }
- rpcClient.close();
- } catch (final Exception ex) {
- LOGGER.error("Attempt to close RPC client failed", ex);
- }
- }
- rpcClient = null;
- }
-
- /**
- * Factory data.
- */
- private static class FactoryData {
- private final String name;
- private final Agent[] agents;
- private final int batchSize;
- private final int delayMillis;
- private final int retries;
- private final int conntectTimeoutMillis;
- private final int requestTimeoutMillis;
-
- /**
- * Constructor.
- * @param name The name of the Appender.
- * @param agents The agents.
- * @param batchSize The number of events to include in a batch.
- */
- public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
- final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
- this.name = name;
- this.agents = agents;
- this.batchSize = batchSize;
- this.delayMillis = delayMillis;
- this.retries = retries;
- this.conntectTimeoutMillis = connectTimeoutMillis;
- this.requestTimeoutMillis = requestTimeoutMillis;
- }
- }
-
- /**
- * Avro Manager Factory.
- */
- private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
-
- /**
- * Create the FlumeAvroManager.
- * @param name The name of the entity to manage.
- * @param data The data required to create the entity.
- * @return The FlumeAvroManager.
- */
- @Override
- public FlumeAvroManager createManager(final String name, final FactoryData data) {
- try {
-
- return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
- data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
- } catch (final Exception ex) {
- LOGGER.error("Could not create FlumeAvroManager", ex);
- }
- return null;
- }
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.flume.appender;
+
+import java.util.Properties;
+
+import org.apache.flume.Event;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.logging.log4j.core.appender.AppenderLoggingException;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+
+/**
+ * Manager for FlumeAvroAppenders.
+ */
+public class FlumeAvroManager extends AbstractFlumeManager {
+
+ private static final int MAX_RECONNECTS = 3;
+ private static final int MINIMUM_TIMEOUT = 1000;
+
+ private static AvroManagerFactory factory = new AvroManagerFactory();
+
+ private final Agent[] agents;
+
+ private final int batchSize;
+
+ private final long delayNanos;
+ private final int delayMillis;
+
+ private final int retries;
+
+ private final int connectTimeoutMillis;
+
+ private final int requestTimeoutMillis;
+
+ private final int current = 0;
+
+ private RpcClient rpcClient = null;
+
+ private BatchEvent batchEvent = new BatchEvent();
+ private long nextSend = 0;
+
+ /**
+ * Constructor
+ * @param name The unique name of this manager.
+ * @param agents An array of Agents.
+ * @param batchSize The number of events to include in a batch.
+ * @param retries The number of times to retry connecting before giving up.
+ * @param connectTimeout The connection timeout in ms.
+ * @param requestTimeout The request timeout in ms.
+ *
+ */
+ protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
+ final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
+ super(name);
+ this.agents = agents;
+ this.batchSize = batchSize;
+ this.delayMillis = delayMillis;
+ this.delayNanos = delayMillis * 1000000;
+ this.retries = retries;
+ this.connectTimeoutMillis = connectTimeout;
+ this.requestTimeoutMillis = requestTimeout;
+ this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
+ }
+
+ /**
+ * Returns a FlumeAvroManager.
+ * @param name The name of the manager.
+ * @param agents The agents to use.
+ * @param batchSize The number of events to include in a batch.
+ * @param delayMillis The number of milliseconds to wait before sending an incomplete batch.
+ * @param retries The number of times to retry connecting before giving up.
+ * @param connectTimeoutMillis The connection timeout in ms.
+ * @param requestTimeoutMillis The request timeout in ms.
+ * @return A FlumeAvroManager.
+ */
+ public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis,
+ final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
+ if (agents == null || agents.length == 0) {
+ throw new IllegalArgumentException("At least one agent is required");
+ }
+
+ if (batchSize <= 0) {
+ batchSize = 1;
+ }
+
+ final StringBuilder sb = new StringBuilder("FlumeAvro[");
+ boolean first = true;
+ for (final Agent agent : agents) {
+ if (!first) {
+ sb.append(',');
+ }
+ sb.append(agent.getHost()).append(':').append(agent.getPort());
+ first = false;
+ }
+ sb.append(']');
+ return getManager(sb.toString(), factory,
+ new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
+ }
+
+ /**
+ * Returns the agents.
+ * @return The agent array.
+ */
+ public Agent[] getAgents() {
+ return agents;
+ }
+
+ /**
+ * Returns the index of the current agent.
+ * @return The index for the current agent.
+ */
+ public int getCurrent() {
+ return current;
+ }
+
+ public int getRetries() {
+ return retries;
+ }
+
+ public int getConnectTimeoutMillis() {
+ return connectTimeoutMillis;
+ }
+
+ public int getRequestTimeoutMillis() {
+ return requestTimeoutMillis;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public int getDelayMillis() {
+ return delayMillis;
+ }
+
+ public synchronized void send(final BatchEvent events) {
+ if (rpcClient == null) {
+ rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
+ }
+
+ if (rpcClient != null) {
+ try {
+ LOGGER.trace("Sending batch of {} events", events.getEvents().size());
+ rpcClient.appendBatch(events.getEvents());
+ } catch (final Exception ex) {
+ rpcClient.close();
+ rpcClient = null;
+ final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+ agents[current].getPort();
+ LOGGER.warn(msg, ex);
+ throw new AppenderLoggingException("No Flume agents are available");
+ }
+ } else {
+ final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+ agents[current].getPort();
+ LOGGER.warn(msg);
+ throw new AppenderLoggingException("No Flume agents are available");
+ }
+ }
+
+ @Override
+ public synchronized void send(final Event event) {
+ if (batchSize == 1) {
+ if (rpcClient == null) {
+ rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
+ }
+
+ if (rpcClient != null) {
+ try {
+ rpcClient.append(event);
+ } catch (final Exception ex) {
+ rpcClient.close();
+ rpcClient = null;
+ final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+ agents[current].getPort();
+ LOGGER.warn(msg, ex);
+ throw new AppenderLoggingException("No Flume agents are available");
+ }
+ } else {
+ final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
+ agents[current].getPort();
+ LOGGER.warn(msg);
+ throw new AppenderLoggingException("No Flume agents are available");
+ }
+ } else {
+ batchEvent.addEvent(event);
+ final int eventCount = batchEvent.getEvents().size();
+ if (eventCount == 1) {
+ nextSend = System.nanoTime() + delayNanos;
+ }
+ if (eventCount >= batchSize || System.nanoTime() >= nextSend) {
+ send(batchEvent);
+ batchEvent = new BatchEvent();
+ }
+ }
+ }
+
+ /**
+ * There is a very good chance that this will always return the first agent even if it isn't available.
+ * @param agents The list of agents to choose from
+ * @return The FlumeEventAvroServer.
+ */
+ private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
+ try {
+ final Properties props = new Properties();
+
+ props.put("client.type", "default_failover");
+
+ int agentCount = 1;
+ final StringBuilder sb = new StringBuilder();
+ for (final Agent agent : agents) {
+ if (sb.length() > 0) {
+ sb.append(' ');
+ }
+ final String hostName = "host" + agentCount++;
+ props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
+ sb.append(hostName);
+ }
+ props.put("hosts", sb.toString());
+ if (batchSize > 0) {
+ props.put("batch-size", Integer.toString(batchSize));
+ }
+ if (retries > 1) {
+ if (retries > MAX_RECONNECTS) {
+ retries = MAX_RECONNECTS;
+ }
+ props.put("max-attempts", Integer.toString(retries * agents.length));
+ }
+ if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
+ props.put("request-timeout", Integer.toString(requestTimeoutMillis));
+ }
+ if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
+ props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
+ }
+ return RpcClientFactory.getInstance(props);
+ } catch (final Exception ex) {
+ LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
+ return null;
+ }
+ }
+
+ @Override
+ protected void releaseSub() {
+ if (rpcClient != null) {
+ try {
+ synchronized(this) {
+ try {
+ if (batchSize > 1 && batchEvent.getEvents().size() > 0) {
+ send(batchEvent);
+ }
+ } catch (final Exception ex) {
+ LOGGER.error("Error sending final batch: {}", ex.getMessage());
+ }
+ }
+ rpcClient.close();
+ } catch (final Exception ex) {
+ LOGGER.error("Attempt to close RPC client failed", ex);
+ }
+ }
+ rpcClient = null;
+ }
+
+ /**
+ * Factory data.
+ */
+ private static class FactoryData {
+ private final String name;
+ private final Agent[] agents;
+ private final int batchSize;
+ private final int delayMillis;
+ private final int retries;
+ private final int conntectTimeoutMillis;
+ private final int requestTimeoutMillis;
+
+ /**
+ * Constructor.
+ * @param name The name of the Appender.
+ * @param agents The agents.
+ * @param batchSize The number of events to include in a batch.
+ */
+ public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
+ final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
+ this.name = name;
+ this.agents = agents;
+ this.batchSize = batchSize;
+ this.delayMillis = delayMillis;
+ this.retries = retries;
+ this.conntectTimeoutMillis = connectTimeoutMillis;
+ this.requestTimeoutMillis = requestTimeoutMillis;
+ }
+ }
+
+ /**
+ * Avro Manager Factory.
+ */
+ private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
+
+ /**
+ * Create the FlumeAvroManager.
+ * @param name The name of the entity to manage.
+ * @param data The data required to create the entity.
+ * @return The FlumeAvroManager.
+ */
+ @Override
+ public FlumeAvroManager createManager(final String name, final FactoryData data) {
+ try {
+
+ return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
+ data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
+ } catch (final Exception ex) {
+ LOGGER.error("Could not create FlumeAvroManager", ex);
+ }
+ return null;
+ }
+ }
+
+}