You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rg...@apache.org on 2013/01/17 00:36:57 UTC
svn commit: r1434494 - in /logging/log4j/log4j2/trunk: flume-ng/
flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/
flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/ src/changes/
Author: rgoers
Date: Wed Jan 16 23:36:57 2013
New Revision: 1434494
URL: http://svn.apache.org/viewvc?rev=1434494&view=rev
Log:
Allow FlumeAvroManager to initialize even if it cannot connect to an agent.
Modified:
logging/log4j/log4j2/trunk/flume-ng/pom.xml
logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
logging/log4j/log4j2/trunk/src/changes/changes.xml
Modified: logging/log4j/log4j2/trunk/flume-ng/pom.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/pom.xml?rev=1434494&r1=1434493&r2=1434494&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/pom.xml (original)
+++ logging/log4j/log4j2/trunk/flume-ng/pom.xml Wed Jan 16 23:36:57 2013
@@ -32,7 +32,7 @@
<log4jParentDir>${basedir}/..</log4jParentDir>
<docLabel>Flume Documentation</docLabel>
<projectDir>/flume-ng</projectDir>
- <flumeVersion>1.2.0</flumeVersion>
+ <flumeVersion>1.3.1</flumeVersion>
</properties>
<dependencies>
<dependency>
Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java?rev=1434494&r1=1434493&r2=1434494&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java Wed Jan 16 23:36:57 2013
@@ -125,74 +125,80 @@ public class FlumeAvroManager extends Ab
if (retries == 0) {
retries = DEFAULT_RECONNECTS;
}
- final AvroFlumeEvent avroEvent = new AvroFlumeEvent();
- avroEvent.body = ByteBuffer.wrap(event.getBody());
- avroEvent.headers = new HashMap<CharSequence, CharSequence>();
-
- for (final Map.Entry<String, String> entry : event.getHeaders().entrySet()) {
- avroEvent.headers.put(entry.getKey(), entry.getValue());
- }
-
- final List<AvroFlumeEvent> batch = batchSize > 1 ? events.addAndGet(avroEvent, batchSize) : null;
- if (batch == null && batchSize > 1) {
- return;
+ if (client == null) {
+ client = connect(agents);
}
+ String msg = "No Flume agents are available";
+ if (client != null) {
+ final AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+ avroEvent.body = ByteBuffer.wrap(event.getBody());
+ avroEvent.headers = new HashMap<CharSequence, CharSequence>();
- int i = 0;
-
- String msg = "Error writing to " + getName();
+ for (final Map.Entry<String, String> entry : event.getHeaders().entrySet()) {
+ avroEvent.headers.put(entry.getKey(), entry.getValue());
+ }
- do {
- try {
- final Status status = (batch == null) ? client.append(avroEvent) : client.appendBatch(batch);
- if (!status.equals(Status.OK)) {
- throw new AvroRemoteException("RPC communication failed to " + agents[current].getHost() +
- ":" + agents[current].getPort());
- }
+ final List<AvroFlumeEvent> batch = batchSize > 1 ? events.addAndGet(avroEvent, batchSize) : null;
+ if (batch == null && batchSize > 1) {
return;
- } catch (final Exception ex) {
- if (i == retries - 1) {
- msg = "Error writing to " + getName() + " at " + agents[current].getHost() + ":" +
- agents[current].getPort();
- LOGGER.warn(msg, ex);
- break;
- }
- sleep(delay);
}
- } while (++i < retries);
- for (int index = 0; index < agents.length; ++index) {
- if (index == current) {
- continue;
- }
- final Agent agent = agents[index];
- i = 0;
+ int i = 0;
+
+ msg = "Error writing to " + getName();
+
do {
try {
- transceiver = null;
- final AvroSourceProtocol c = connect(agent.getHost(), agent.getPort());
- final Status status = (batch == null) ? c.append(avroEvent) : c.appendBatch(batch);
+ final Status status = (batch == null) ? client.append(avroEvent) : client.appendBatch(batch);
if (!status.equals(Status.OK)) {
- if (i == retries - 1) {
- final String warnMsg = "RPC communication failed to " + getName() + " at " +
- agent.getHost() + ":" + agent.getPort();
- LOGGER.warn(warnMsg);
- }
- continue;
+ throw new AvroRemoteException("RPC communication failed to " + agents[current].getHost() +
+ ":" + agents[current].getPort());
}
- client = c;
- current = i;
return;
} catch (final Exception ex) {
if (i == retries - 1) {
- final String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" +
- agent.getPort();
- LOGGER.warn(warnMsg, ex);
+ msg = "Error writing to " + getName() + " at " + agents[current].getHost() + ":" +
+ agents[current].getPort();
+ LOGGER.warn(msg, ex);
break;
}
sleep(delay);
}
} while (++i < retries);
+
+ for (int index = 0; index < agents.length; ++index) {
+ if (index == current) {
+ continue;
+ }
+ final Agent agent = agents[index];
+ i = 0;
+ do {
+ try {
+ transceiver = null;
+ final AvroSourceProtocol c = connect(agent.getHost(), agent.getPort());
+ final Status status = (batch == null) ? c.append(avroEvent) : c.appendBatch(batch);
+ if (!status.equals(Status.OK)) {
+ if (i == retries - 1) {
+ final String warnMsg = "RPC communication failed to " + getName() + " at " +
+ agent.getHost() + ":" + agent.getPort();
+ LOGGER.warn(warnMsg);
+ }
+ continue;
+ }
+ client = c;
+ current = i;
+ return;
+ } catch (final Exception ex) {
+ if (i == retries - 1) {
+ final String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" +
+ agent.getPort();
+ LOGGER.warn(warnMsg, ex);
+ break;
+ }
+ sleep(delay);
+ }
+ } while (++i < retries);
+ }
}
throw new AppenderRuntimeException(msg);
@@ -222,7 +228,8 @@ public class FlumeAvroManager extends Ab
}
++i;
}
- throw new AppenderRuntimeException("Unable to connect to any agents");
+ LOGGER.error("Flume manager " + getName() + " was unable to connect to any agents");
+ return null;
}
private AvroSourceProtocol connect(final String hostname, final int port) {
Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java?rev=1434494&r1=1434493&r2=1434494&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java Wed Jan 16 23:36:57 2013
@@ -274,6 +274,50 @@ public class FlumeAppenderTest {
Assert.assertTrue(caughtException);
}
+ @Test
+ public void testNotConnected() throws Exception {
+ eventSource.stop();
+ final String altPort = Integer.toString(Integer.parseInt(testPort) + 1);
+ final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort),
+ Agent.createAgent("localhost", altPort)};
+ final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
+ "false", null, null, null, null, null, "true", "1", null, null, null);
+ avroAppender.start();
+ Assert.assertTrue("Appender Not started", avroAppender.isStarted());
+ avroLogger.addAppender(avroAppender);
+ avroLogger.setLevel(Level.ALL);
+
+ try {
+ avroLogger.info("Test message");
+ Assert.fail("Exception should have been thrown");
+ } catch (Exception ex) {
+
+ }
+
+ try {
+ final Context context = new Context();
+ context.put("port", altPort);
+ context.put("bind", "0.0.0.0");
+
+ Configurables.configure(eventSource, context);
+
+ eventSource.start();
+ } catch (final ChannelException e) {
+ Assert.fail("Caught exception while resetting port to " + altPort + " : " + e.getMessage());
+ }
+
+ avroLogger.info("Test message 2");
+
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+
+ Event event = channel.take();
+ Assert.assertNotNull(event);
+ Assert.assertTrue("Channel contained event, but not expected message",
+ getBody(event).endsWith("Test message 2"));
+ transaction.commit();
+ transaction.close();
+ }
@Test
Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java?rev=1434494&r1=1434493&r2=1434494&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java Wed Jan 16 23:36:57 2013
@@ -75,6 +75,10 @@ public class FlumeEmbeddedAgentTest {
@BeforeClass
public static void setupClass() {
// System.setProperty(DefaultConfiguration.DEFAULT_LEVEL, Level.DEBUG.toString());
+ final File file = new File("target/file-channel");
+ if (!deleteFiles(file)) {
+ System.err.println("Warning - unable to delete target/file-channel. Test errors may occur");
+ }
}
@AfterClass
@@ -268,7 +272,7 @@ public class FlumeEmbeddedAgentTest {
}
- private boolean deleteFiles(final File file) {
+ private static boolean deleteFiles(final File file) {
boolean result = true;
if (file.isDirectory()) {
@@ -278,7 +282,7 @@ public class FlumeEmbeddedAgentTest {
}
} else if (!file.exists()) {
- return false;
+ return true;
}
return result &= file.delete();
Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java?rev=1434494&r1=1434493&r2=1434494&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java Wed Jan 16 23:36:57 2013
@@ -75,6 +75,10 @@ public class FlumeEmbeddedAppenderTest {
@BeforeClass
public static void setupClass() {
// System.setProperty(DefaultConfiguration.DEFAULT_LEVEL, Level.DEBUG.toString());
+ final File file = new File("target/file-channel");
+ if (!deleteFiles(file)) {
+ System.err.println("Warning - unable to delete target/file-channel. Test errors may occur");
+ }
}
@AfterClass
@@ -269,7 +273,7 @@ public class FlumeEmbeddedAppenderTest {
}
- private boolean deleteFiles(final File file) {
+ private static boolean deleteFiles(final File file) {
boolean result = true;
if (file.isDirectory()) {
@@ -279,7 +283,7 @@ public class FlumeEmbeddedAppenderTest {
}
} else if (!file.exists()) {
- return false;
+ return true;
}
return result &= file.delete();
Modified: logging/log4j/log4j2/trunk/src/changes/changes.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/changes/changes.xml?rev=1434494&r1=1434493&r2=1434494&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/changes/changes.xml (original)
+++ logging/log4j/log4j2/trunk/src/changes/changes.xml Wed Jan 16 23:36:57 2013
@@ -23,6 +23,9 @@
<body>
<release version="2.0-beta4" date="TBD" description="Bug fixes and enhancements">
+ <action dev="rgoers" type="fix">
+ Allow FlumeAvroManager to initialize even if it cannot connect to an agent.
+ </action>
<action issue="LOG4J2-149" dev="rgoers" type="fix" due-to="Scott Severtson">
SMTPAppender will only cache filtered events.
</action>