You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by pr...@apache.org on 2011/11/28 23:19:59 UTC
svn commit: r1207655 - in /incubator/flume/trunk/flume-core/src:
main/java/com/cloudera/flume/core/
main/java/com/cloudera/flume/handlers/debug/
main/java/com/cloudera/flume/handlers/rolling/
test/java/com/cloudera/flume/collector/
Author: prasadm
Date: Mon Nov 28 22:19:58 2011
New Revision: 1207655
URL: http://svn.apache.org/viewvc?rev=1207655&view=rev
Log:
FLUME-857: Handle IOException in close() for retry sinks
Modified:
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/core/EventSinkDecorator.java
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/debug/StubbornAppendSink.java
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java
incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/core/EventSinkDecorator.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/core/EventSinkDecorator.java?rev=1207655&r1=1207654&r2=1207655&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/core/EventSinkDecorator.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/core/EventSinkDecorator.java Mon Nov 28 22:19:58 2011
@@ -64,8 +64,12 @@ public class EventSinkDecorator<S extend
@Override
public void close() throws IOException, InterruptedException {
Preconditions.checkNotNull(sink);
- sink.close();
- isOpen.set(false);
+ try {
+ sink.close();
+ } finally {
+ // mark it closed regardless the downstream close exception
+ isOpen.set(false);
+ }
}
@Override
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/debug/StubbornAppendSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/debug/StubbornAppendSink.java?rev=1207655&r1=1207654&r2=1207655&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/debug/StubbornAppendSink.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/debug/StubbornAppendSink.java Mon Nov 28 22:19:58 2011
@@ -82,8 +82,12 @@ public class StubbornAppendSink<S extend
LOG.info("append failed on event '{}' with error: {}", e, ex.getMessage());
appendFails.incrementAndGet();
- super.close(); // close
-
+ try {
+ super.close(); // close
+ } catch (IOException eI) {
+ // Ignore the IOexception in close and continue to reopen
+ LOG.error("Error in closing Stubborn Append Sink" , eI);
+ }
open(); // attempt to reopen
super.append(e); // resend
appendSuccesses.incrementAndGet();
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java?rev=1207655&r1=1207654&r2=1207655&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java Mon Nov 28 22:19:58 2011
@@ -329,8 +329,12 @@ public class RollSink extends EventSink.
return;
}
curSink.close();
- curSink = null;
} finally {
+ /* If the downstream sink throws an exception while closing, then its state
+ * is unknown. We want to play safe and remove its reference.from the roller
+ * so that the flow can continue.
+ */
+ curSink = null;
lock.writeLock().unlock();
}
}
Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java?rev=1207655&r1=1207654&r2=1207655&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java Mon Nov 28 22:19:58 2011
@@ -22,13 +22,18 @@ import static org.junit.Assert.assertNot
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -917,4 +922,98 @@ public class TestCollectorSink {
FileUtil.rmr(f);
}
}
+
+ /**
+ * This test verifies that when both append and close throw an exception
+ * the retry mechanism (stubborn append) reopens the sink and continues
+ * to flow event to the sink correctly
+ */
+ @SuppressWarnings("rawtypes")
+ @Test(timeout=5000)
+ public void testAppendCloseIOExceptionSink() throws IOException,
+ InterruptedException, FlumeSpecException {
+ final EventSink snk = mock(EventSink.class);
+ final List<String> result = new LinkedList<String> ();
+
+ // mock append that throws IOException on first invocation and
+ // then sets an attribute on second time
+ doThrow(new IOException("Force unexpected append error")).
+ doAnswer(new Answer () {
+ @Override
+ public Object answer(InvocationOnMock aInvocation)
+ throws Throwable {
+ result.add("success"); // append executed by retry sinks
+ return null;
+ }}).
+ when(snk).append((Event) anyObject());
+
+ doThrow(new IOException("Force unexpected close error")).doNothing().
+ when(snk).close();
+
+ doNothing().when(snk).open();
+ SinkBuilder sb = new SinkBuilder() {
+ @Override
+ public EventSink build(Context context, String... argv) {
+ return snk;
+ }
+ };
+ SinkFactoryImpl sf = new SinkFactoryImpl();
+ sf.setSink("mIOSink", sb);
+ FlumeBuilder.setSinkFactory(sf);
+
+ final EventSink coll = FlumeBuilder.buildSink(
+ LogicalNodeContext.testingContext(), "collector(5000) { mIOSink }");
+ coll.open();
+ coll.append(new EventImpl("foo".getBytes()));
+ coll.close();
+ assertEquals(result.get(0), "success");
+ }
+
+ /**
+ *
+ * This test verifies that when both append and close throw multiple
+ * consecutive exceptions, the retry mechanism (insistent append)
+ * reopens the sink and continues to flow event to the sink correctly
+ */
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testMultiAppendCloseIOExceptionSink() throws IOException,
+ InterruptedException, FlumeSpecException {
+ final EventSink snk = mock(EventSink.class);
+ final List<String> result = new LinkedList<String> ();
+
+ // mock append that throws IOException on first two invocation and
+ // then sets an attribute on third time
+ doThrow(new IOException("Force unexpected append error")).
+ doThrow(new IOException("Force unexpected append error")).
+ doAnswer(new Answer () {
+ @Override
+ public Object answer(InvocationOnMock aInvocation)
+ throws Throwable {
+ result.add("success"); // append executed by retry sinks
+ return null;
+ }}).
+ when(snk).append((Event) anyObject());
+
+ doThrow(new IOException("Force unexpected close error")).
+ doNothing().when(snk).close();
+
+ doNothing().when(snk).open();
+ SinkBuilder sb = new SinkBuilder() {
+ @Override
+ public EventSink build(Context context, String... argv) {
+ return snk;
+ }
+ };
+ SinkFactoryImpl sf = new SinkFactoryImpl();
+ sf.setSink("mIOSink", sb);
+ FlumeBuilder.setSinkFactory(sf);
+
+ final EventSink coll = FlumeBuilder.buildSink(
+ LogicalNodeContext.testingContext(), "collector(5000) { mIOSink }");
+ coll.open();
+ coll.append(new EventImpl("foo".getBytes()));
+ coll.close();
+ assertEquals(result.get(0), "success");
+ }
}