You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@logging.apache.org by rg...@apache.org on 2013/01/17 00:36:57 UTC

svn commit: r1434494 - in /logging/log4j/log4j2/trunk: flume-ng/ flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/ flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/ src/changes/

Author: rgoers
Date: Wed Jan 16 23:36:57 2013
New Revision: 1434494

URL: http://svn.apache.org/viewvc?rev=1434494&view=rev
Log:
Allow FlumeAvroManager to initialize even if it cannot connect to an agent.

Modified:
    logging/log4j/log4j2/trunk/flume-ng/pom.xml
    logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
    logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
    logging/log4j/log4j2/trunk/src/changes/changes.xml

Modified: logging/log4j/log4j2/trunk/flume-ng/pom.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/pom.xml?rev=1434494&r1=1434493&r2=1434494&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/pom.xml (original)
+++ logging/log4j/log4j2/trunk/flume-ng/pom.xml Wed Jan 16 23:36:57 2013
@@ -32,7 +32,7 @@
     <log4jParentDir>${basedir}/..</log4jParentDir>
     <docLabel>Flume Documentation</docLabel>
     <projectDir>/flume-ng</projectDir>
-    <flumeVersion>1.2.0</flumeVersion>
+    <flumeVersion>1.3.1</flumeVersion>
   </properties>
   <dependencies>
     <dependency>

Modified: logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java?rev=1434494&r1=1434493&r2=1434494&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/main/java/org/apache/logging/log4j/flume/appender/FlumeAvroManager.java Wed Jan 16 23:36:57 2013
@@ -125,74 +125,80 @@ public class FlumeAvroManager extends Ab
         if (retries == 0) {
             retries = DEFAULT_RECONNECTS;
         }
-        final AvroFlumeEvent avroEvent = new AvroFlumeEvent();
-        avroEvent.body = ByteBuffer.wrap(event.getBody());
-        avroEvent.headers = new HashMap<CharSequence, CharSequence>();
-
-        for (final Map.Entry<String, String> entry : event.getHeaders().entrySet()) {
-          avroEvent.headers.put(entry.getKey(), entry.getValue());
-        }
-
-        final List<AvroFlumeEvent> batch = batchSize > 1 ? events.addAndGet(avroEvent, batchSize) : null;
-        if (batch == null && batchSize > 1) {
-            return;
+        if (client == null) {
+            client = connect(agents);
         }
+        String msg = "No Flume agents are available";
+        if (client != null) {
+            final AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+            avroEvent.body = ByteBuffer.wrap(event.getBody());
+            avroEvent.headers = new HashMap<CharSequence, CharSequence>();
 
-        int i = 0;
-
-        String msg = "Error writing to " + getName();
+            for (final Map.Entry<String, String> entry : event.getHeaders().entrySet()) {
+                avroEvent.headers.put(entry.getKey(), entry.getValue());
+            }
 
-        do {
-            try {
-                final Status status = (batch == null) ? client.append(avroEvent) : client.appendBatch(batch);
-                if (!status.equals(Status.OK)) {
-                    throw new AvroRemoteException("RPC communication failed to " + agents[current].getHost() +
-                        ":" + agents[current].getPort());
-                }
+            final List<AvroFlumeEvent> batch = batchSize > 1 ? events.addAndGet(avroEvent, batchSize) : null;
+            if (batch == null && batchSize > 1) {
                 return;
-            } catch (final Exception ex) {
-                if (i == retries - 1) {
-                    msg = "Error writing to " + getName() + " at " + agents[current].getHost() + ":" +
-                        agents[current].getPort();
-                    LOGGER.warn(msg, ex);
-                    break;
-                }
-                sleep(delay);
             }
-        } while (++i < retries);
 
-        for (int index = 0; index < agents.length; ++index) {
-            if (index == current) {
-                continue;
-            }
-            final Agent agent = agents[index];
-            i = 0;
+            int i = 0;
+
+            msg = "Error writing to " + getName();
+
             do {
                 try {
-                    transceiver = null;
-                    final AvroSourceProtocol c = connect(agent.getHost(), agent.getPort());
-                    final Status status = (batch == null) ? c.append(avroEvent) : c.appendBatch(batch);
+                    final Status status = (batch == null) ? client.append(avroEvent) : client.appendBatch(batch);
                     if (!status.equals(Status.OK)) {
-                        if (i == retries - 1) {
-                            final String warnMsg = "RPC communication failed to " + getName() + " at " +
-                                agent.getHost() + ":" + agent.getPort();
-                            LOGGER.warn(warnMsg);
-                        }
-                        continue;
+                        throw new AvroRemoteException("RPC communication failed to " + agents[current].getHost() +
+                            ":" + agents[current].getPort());
                     }
-                    client = c;
-                    current = i;
                     return;
                 } catch (final Exception ex) {
                     if (i == retries - 1) {
-                        final String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" +
-                            agent.getPort();
-                        LOGGER.warn(warnMsg, ex);
+                        msg = "Error writing to " + getName() + " at " + agents[current].getHost() + ":" +
+                            agents[current].getPort();
+                        LOGGER.warn(msg, ex);
                         break;
                     }
                     sleep(delay);
                 }
             } while (++i < retries);
+
+            for (int index = 0; index < agents.length; ++index) {
+                if (index == current) {
+                    continue;
+                }
+                final Agent agent = agents[index];
+                i = 0;
+                do {
+                    try {
+                        transceiver = null;
+                        final AvroSourceProtocol c = connect(agent.getHost(), agent.getPort());
+                        final Status status = (batch == null) ? c.append(avroEvent) : c.appendBatch(batch);
+                        if (!status.equals(Status.OK)) {
+                            if (i == retries - 1) {
+                                final String warnMsg = "RPC communication failed to " + getName() + " at " +
+                                    agent.getHost() + ":" + agent.getPort();
+                                LOGGER.warn(warnMsg);
+                            }
+                            continue;
+                        }
+                        client = c;
+                        current = i;
+                        return;
+                    } catch (final Exception ex) {
+                        if (i == retries - 1) {
+                            final String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" +
+                                agent.getPort();
+                            LOGGER.warn(warnMsg, ex);
+                            break;
+                        }
+                        sleep(delay);
+                    }
+                } while (++i < retries);
+            }
         }
 
         throw new AppenderRuntimeException(msg);
@@ -222,7 +228,8 @@ public class FlumeAvroManager extends Ab
             }
             ++i;
         }
-        throw new AppenderRuntimeException("Unable to connect to any agents");
+        LOGGER.error("Flume manager " + getName() + " was unable to connect to any agents");
+        return null;
     }
 
     private AvroSourceProtocol connect(final String hostname, final int port) {

Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java?rev=1434494&r1=1434493&r2=1434494&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeAppenderTest.java Wed Jan 16 23:36:57 2013
@@ -274,6 +274,50 @@ public class FlumeAppenderTest {
         Assert.assertTrue(caughtException);
     }
 
+    @Test
+    public void testNotConnected() throws Exception {
+        eventSource.stop();
+        final String altPort = Integer.toString(Integer.parseInt(testPort) + 1);
+        final Agent[] agents = new Agent[] {Agent.createAgent("localhost", testPort),
+            Agent.createAgent("localhost", altPort)};
+        final FlumeAppender avroAppender = FlumeAppender.createAppender(agents, null, "false", null, "100", "3", "avro",
+            "false", null, null, null, null, null, "true", "1", null, null, null);
+        avroAppender.start();
+        Assert.assertTrue("Appender Not started", avroAppender.isStarted());
+        avroLogger.addAppender(avroAppender);
+        avroLogger.setLevel(Level.ALL);
+
+        try {
+            avroLogger.info("Test message");
+            Assert.fail("Exception should have been thrown");
+        } catch (Exception ex) {
+
+        }
+
+        try {
+            final Context context = new Context();
+            context.put("port", altPort);
+            context.put("bind", "0.0.0.0");
+
+            Configurables.configure(eventSource, context);
+
+            eventSource.start();
+        } catch (final ChannelException e) {
+            Assert.fail("Caught exception while resetting port to " + altPort + " : " + e.getMessage());
+        }
+
+        avroLogger.info("Test message 2");
+
+        Transaction transaction = channel.getTransaction();
+        transaction.begin();
+
+        Event event = channel.take();
+        Assert.assertNotNull(event);
+        Assert.assertTrue("Channel contained event, but not expected message",
+            getBody(event).endsWith("Test message 2"));
+        transaction.commit();
+        transaction.close();
+    }
 
 
     @Test

Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java?rev=1434494&r1=1434493&r2=1434494&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAgentTest.java Wed Jan 16 23:36:57 2013
@@ -75,6 +75,10 @@ public class FlumeEmbeddedAgentTest {
     @BeforeClass
     public static void setupClass() {
         // System.setProperty(DefaultConfiguration.DEFAULT_LEVEL, Level.DEBUG.toString());
+        final File file = new File("target/file-channel");
+        if (!deleteFiles(file)) {
+            System.err.println("Warning - unable to delete target/file-channel. Test errors may occur");
+        }
     }
 
     @AfterClass
@@ -268,7 +272,7 @@ public class FlumeEmbeddedAgentTest {
 
     }
 
-    private boolean deleteFiles(final File file) {
+    private static boolean deleteFiles(final File file) {
         boolean result = true;
         if (file.isDirectory()) {
 
@@ -278,7 +282,7 @@ public class FlumeEmbeddedAgentTest {
             }
 
         } else if (!file.exists()) {
-            return false;
+            return true;
         }
 
         return result &= file.delete();

Modified: logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java?rev=1434494&r1=1434493&r2=1434494&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java (original)
+++ logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java Wed Jan 16 23:36:57 2013
@@ -75,6 +75,10 @@ public class FlumeEmbeddedAppenderTest {
     @BeforeClass
     public static void setupClass() {
         // System.setProperty(DefaultConfiguration.DEFAULT_LEVEL, Level.DEBUG.toString());
+        final File file = new File("target/file-channel");
+        if (!deleteFiles(file)) {
+            System.err.println("Warning - unable to delete target/file-channel. Test errors may occur");
+        }
     }
 
     @AfterClass
@@ -269,7 +273,7 @@ public class FlumeEmbeddedAppenderTest {
 
     }
 
-    private boolean deleteFiles(final File file) {
+    private static boolean deleteFiles(final File file) {
         boolean result = true;
         if (file.isDirectory()) {
 
@@ -279,7 +283,7 @@ public class FlumeEmbeddedAppenderTest {
             }
 
         } else if (!file.exists()) {
-            return false;
+            return true;
         }
 
         return result &= file.delete();

Modified: logging/log4j/log4j2/trunk/src/changes/changes.xml
URL: http://svn.apache.org/viewvc/logging/log4j/log4j2/trunk/src/changes/changes.xml?rev=1434494&r1=1434493&r2=1434494&view=diff
==============================================================================
--- logging/log4j/log4j2/trunk/src/changes/changes.xml (original)
+++ logging/log4j/log4j2/trunk/src/changes/changes.xml Wed Jan 16 23:36:57 2013
@@ -23,6 +23,9 @@
 
   <body>
     <release version="2.0-beta4" date="TBD" description="Bug fixes and enhancements">
+      <action dev="rgoers" type="fix">
+        Allow FlumeAvroManager to initialize even if it cannot connect to an agent.
+      </action>
       <action issue="LOG4J2-149" dev="rgoers" type="fix" due-to="Scott Severtson">
         SMTPAppender will only cache filtered events.
       </action>