You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by pr...@apache.org on 2011/11/28 23:19:59 UTC

svn commit: r1207655 - in /incubator/flume/trunk/flume-core/src: main/java/com/cloudera/flume/core/ main/java/com/cloudera/flume/handlers/debug/ main/java/com/cloudera/flume/handlers/rolling/ test/java/com/cloudera/flume/collector/

Author: prasadm
Date: Mon Nov 28 22:19:58 2011
New Revision: 1207655

URL: http://svn.apache.org/viewvc?rev=1207655&view=rev
Log:
FLUME-857: Handle IOException in close() for retry sinks

Modified:
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/core/EventSinkDecorator.java
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/debug/StubbornAppendSink.java
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/core/EventSinkDecorator.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/core/EventSinkDecorator.java?rev=1207655&r1=1207654&r2=1207655&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/core/EventSinkDecorator.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/core/EventSinkDecorator.java Mon Nov 28 22:19:58 2011
@@ -64,8 +64,12 @@ public class EventSinkDecorator<S extend
   @Override
   public void close() throws IOException, InterruptedException {
     Preconditions.checkNotNull(sink);
-    sink.close();
-    isOpen.set(false);
+    try {
+      sink.close();
+    } finally {
+      // mark it closed regardless the downstream close exception
+      isOpen.set(false);
+    }
   }
 
   @Override

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/debug/StubbornAppendSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/debug/StubbornAppendSink.java?rev=1207655&r1=1207654&r2=1207655&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/debug/StubbornAppendSink.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/debug/StubbornAppendSink.java Mon Nov 28 22:19:58 2011
@@ -82,8 +82,12 @@ public class StubbornAppendSink<S extend
       LOG.info("append failed on event '{}' with error: {}", e, ex.getMessage());
 
       appendFails.incrementAndGet();
-      super.close(); // close
-
+      try {
+        super.close(); // close
+      } catch (IOException eI) {
+        // Ignore the IOexception in close and continue to reopen
+        LOG.error("Error in closing Stubborn Append Sink" , eI);
+      }
       open(); // attempt to reopen
       super.append(e); // resend
       appendSuccesses.incrementAndGet();

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java?rev=1207655&r1=1207654&r2=1207655&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java Mon Nov 28 22:19:58 2011
@@ -329,8 +329,12 @@ public class RollSink extends EventSink.
         return;
       }
       curSink.close();
-      curSink = null;
     } finally {
+      /* If the downstream sink throws an exception while closing, then its state
+       * is unknown. We want to play safe and remove its reference.from the roller
+       * so that the flow can continue.
+       */
+      curSink = null;
       lock.writeLock().unlock();
     }
   }

Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java?rev=1207655&r1=1207654&r2=1207655&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java Mon Nov 28 22:19:58 2011
@@ -22,13 +22,18 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -917,4 +922,98 @@ public class TestCollectorSink {
       FileUtil.rmr(f);
     }
   }
+
+  /**
+   * This test verifies that when both append and close throw an exception
+   * the retry mechanism (stubborn append) reopens the sink and continues
+   * to flow event to the sink correctly
+   */
+  @SuppressWarnings("rawtypes")
+  @Test(timeout=5000)
+  public void testAppendCloseIOExceptionSink() throws IOException,
+      InterruptedException, FlumeSpecException {
+    final EventSink snk = mock(EventSink.class);
+    final List<String> result = new LinkedList<String> ();
+
+    // mock append that throws IOException on first invocation and
+    // then sets an attribute on second time
+    doThrow(new IOException("Force unexpected append error")).
+      doAnswer(new Answer () {
+        @Override
+        public Object answer(InvocationOnMock aInvocation)
+            throws Throwable {
+          result.add("success"); // append executed by retry sinks
+          return null;
+        }}).
+        when(snk).append((Event) anyObject());
+
+    doThrow(new IOException("Force unexpected close error")).doNothing().
+        when(snk).close();
+
+    doNothing().when(snk).open();
+    SinkBuilder sb = new SinkBuilder() {
+      @Override
+      public EventSink build(Context context, String... argv) {
+        return snk;
+      }
+    };
+    SinkFactoryImpl sf = new SinkFactoryImpl();
+    sf.setSink("mIOSink", sb);
+    FlumeBuilder.setSinkFactory(sf);
+
+    final EventSink coll = FlumeBuilder.buildSink(
+        LogicalNodeContext.testingContext(), "collector(5000) { mIOSink }");
+    coll.open();
+    coll.append(new EventImpl("foo".getBytes()));
+    coll.close();
+    assertEquals(result.get(0), "success");
+  }
+
+  /**
+  *
+  * This test verifies that when both append and close throw multiple
+  * consecutive exceptions, the retry mechanism (insistent append)
+  * reopens the sink and continues to flow event to the sink correctly
+  */
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testMultiAppendCloseIOExceptionSink() throws IOException,
+     InterruptedException, FlumeSpecException {
+   final EventSink snk = mock(EventSink.class);
+   final List<String> result = new LinkedList<String> ();
+
+   // mock append that throws IOException on first two invocation and
+   // then sets an attribute on third time
+   doThrow(new IOException("Force unexpected append error")).
+     doThrow(new IOException("Force unexpected append error")).
+     doAnswer(new Answer () {
+       @Override
+       public Object answer(InvocationOnMock aInvocation)
+           throws Throwable {
+         result.add("success"); // append executed by retry sinks
+         return null;
+       }}).
+       when(snk).append((Event) anyObject());
+
+   doThrow(new IOException("Force unexpected close error")).
+     doNothing().when(snk).close();
+
+   doNothing().when(snk).open();
+   SinkBuilder sb = new SinkBuilder() {
+     @Override
+     public EventSink build(Context context, String... argv) {
+       return snk;
+     }
+   };
+   SinkFactoryImpl sf = new SinkFactoryImpl();
+   sf.setSink("mIOSink", sb);
+   FlumeBuilder.setSinkFactory(sf);
+
+   final EventSink coll = FlumeBuilder.buildSink(
+       LogicalNodeContext.testingContext(), "collector(5000) { mIOSink }");
+   coll.open();
+   coll.append(new EventImpl("foo".getBytes()));
+   coll.close();
+   assertEquals(result.get(0), "success");
+ }
 }