You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by pr...@apache.org on 2011/11/30 09:02:00 UTC
svn commit: r1208316 - in /incubator/flume/trunk/flume-core/src:
main/java/com/cloudera/flume/conf/
main/java/com/cloudera/flume/handlers/rolling/
test/java/com/cloudera/flume/handlers/rolling/
Author: prasadm
Date: Wed Nov 30 08:01:58 2011
New Revision: 1208316
URL: http://svn.apache.org/viewvc?rev=1208316&view=rev
Log:
Flume-798: Blocked append interrupted by rotation event
Added:
incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestSlowSinkRoll.java
Modified:
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java
incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollRollTags.java
incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java?rev=1208316&r1=1208315&r2=1208316&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java Wed Nov 30 08:01:58 2011
@@ -183,6 +183,7 @@ public class FlumeConfiguration extends
public final static String COLLECTOR_EVENT_PORT = "flume.collector.event.port";
public static final String COLLECTOR_DFS_DIR = "flume.collector.dfs.dir";
public static final String COLLECTOR_ROLL_MILLIS = "flume.collector.roll.millis";
+ public static final String COLLECTOR_ROLL_TIMEOUT = "flume.collector.roll.timeout";
public static final String COLLECTOR_OUTPUT_FORMAT = "flume.collector.output.format";
public static final String COLLECTOR_DFS_COMPRESS_CODEC = "flume.collector.dfs.compress.codec";
@@ -641,6 +642,10 @@ public class FlumeConfiguration extends
return getLong(COLLECTOR_ROLL_MILLIS, 30000);
}
+ public long getCollectorRollTimeout() {
+ return getLong(COLLECTOR_ROLL_TIMEOUT, 1000);
+ }
+
/**
* This is the list of masters that agent nodes will connect to
*/
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java?rev=1208316&r1=1208315&r2=1208316&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java Wed Nov 30 08:01:58 2011
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeBuilder;
+import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.FlumeBuilder.FunctionSpec;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
@@ -68,11 +69,14 @@ public class RollSink extends EventSink.
private static int threadInitNumber = 0;
final long checkLatencyMs; // default 4x a second
private Context ctx; // roll context
+ private long timeOut; // lock wait timeout
+ private boolean forceInterrupt = true;
// reporting attributes and counters
public final static String A_ROLLS = "rolls";
public final static String A_ROLLFAILS = "rollfails";
public final static String A_ROLLSPEC = "rollspec";
+ public final static String A_ROLL_ABORTED_APPENDS = "rollCanceledAppends";
public final String A_ROLL_TAG; // TODO (jon) parameterize this.
public final static String DEFAULT_ROLL_TAG = "rolltag";
@@ -80,6 +84,7 @@ public class RollSink extends EventSink.
final AtomicLong rolls = new AtomicLong();
final AtomicLong rollfails = new AtomicLong();
+ final AtomicLong rollCaneledAppends = new AtomicLong();
public RollSink(Context ctx, String spec, long maxAge, long checkMs) {
this.ctx = ctx;
@@ -87,6 +92,7 @@ public class RollSink extends EventSink.
this.fspec = spec;
this.trigger = new TimeTrigger(new ProcessTagger(), maxAge);
this.checkLatencyMs = checkMs;
+ setTimeOut(FlumeConfiguration.get().getCollectorRollTimeout());
LOG.info("Created RollSink: maxAge=" + maxAge + "ms trigger=[" + trigger
+ "] checkPeriodMs = " + checkLatencyMs + " spec='" + fspec + "'");
}
@@ -97,6 +103,7 @@ public class RollSink extends EventSink.
this.fspec = spec;
this.trigger = trigger;
this.checkLatencyMs = checkMs;
+ setTimeOut(FlumeConfiguration.get().getCollectorRollTimeout());
LOG.info("Created RollSink: trigger=[" + trigger + "] checkPeriodMs = "
+ checkLatencyMs + " spec='" + fspec + "'");
}
@@ -205,12 +212,10 @@ public class RollSink extends EventSink.
throw new RuntimeException(e1.getCause());
}
} catch (CancellationException ce) {
- Thread.currentThread().interrupt();
- throw new InterruptedException(
+ throw new RuntimeException(
"Blocked append interrupted by rotation event");
} catch (InterruptedException ex) {
LOG.warn("Unexpected Exception " + ex.getMessage(), ex);
- Thread.currentThread().interrupt();
throw (InterruptedException) ex;
}
}
@@ -228,7 +233,18 @@ public class RollSink extends EventSink.
}
String tag = trigger.getTagger().getTag();
- e.set(A_ROLL_TAG, tag.getBytes());
+ /* Note that if the directdriver is re-trying this event due to error in
+ * last append, then the event will already have the roll tag
+ * In that case, we want to continue using such event
+ */
+ try {
+ e.set(A_ROLL_TAG, tag.getBytes());
+ } catch (IllegalArgumentException eI) {
+ // if there's a previous rolltag then use it, else rethrow the exception
+ if (e.get(A_ROLL_TAG) == null)
+ throw eI;
+ }
+
lock.readLock().lock();
try {
curSink.append(e);
@@ -264,10 +280,18 @@ public class RollSink extends EventSink.
}
public boolean rotate() throws InterruptedException {
- while (!lock.writeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {
+ while (!lock.writeLock().tryLock(timeOut, TimeUnit.MILLISECONDS)) {
// interrupt the future on the other.
if (future != null) {
+ if (forceInterrupt == false) {
+ /* If the node is configured not to interrupt an append,
+ * then bail out. The next append or roll will take care
+ * rotating the file.
+ */
+ return false;
+ }
future.cancel(true);
+ rollCaneledAppends.incrementAndGet();
}
// NOTE: there is no guarantee that this cancel actually succeeds.
@@ -293,7 +317,7 @@ public class RollSink extends EventSink.
LOG.info("closing RollSink '" + fspec + "'");
// attempt to get the lock, and if we cannot, issue a cancel
- while (!lock.writeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {
+ while (!lock.writeLock().tryLock(timeOut, TimeUnit.MILLISECONDS)) {
// interrupt the future on the other.
if (future != null) {
future.cancel(true);
@@ -380,6 +404,7 @@ public class RollSink extends EventSink.
rpt.setLongMetric(A_ROLLS, rolls.get());
rpt.setLongMetric(A_ROLLFAILS, rollfails.get());
rpt.setStringMetric(A_ROLLSPEC, fspec);
+ rpt.setLongMetric(A_ROLL_ABORTED_APPENDS, rollCaneledAppends.get());
return rpt;
}
@@ -443,6 +468,13 @@ public class RollSink extends EventSink.
return rt;
}
+ public void setTimeOut (long timeout) {
+ this.timeOut = timeout;
+ if (timeout == 0) {
+ forceInterrupt = false;
+ }
+ }
+
/**
* Builder for a spec based rolling sink. (most general version, does not
* necessarily output to files!).
Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollRollTags.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollRollTags.java?rev=1208316&r1=1208315&r2=1208316&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollRollTags.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollRollTags.java Wed Nov 30 08:01:58 2011
@@ -67,23 +67,23 @@ public class TestRollRollTags {
assertEquals("second", Attributes.readString(e2, "duped"));
}
- @Test(expected = IllegalArgumentException.class)
+ @Test
public void testRollRollConflict() throws IOException, FlumeSpecException,
InterruptedException {
EventSink snk = new CompositeSink(new Context(),
"{value(\"rolltag\",\"foofoo\") => roll(10000) {null} } ");
Event e = new EventImpl("foo".getBytes());
snk.open();
- snk.append(e); // should bork.
+ snk.append(e); // should not bork.
}
- @Test(expected = IllegalArgumentException.class)
+ @Test
public void testRollRollBork() throws IOException, FlumeSpecException, InterruptedException {
EventSink snk = new CompositeSink(new Context(),
"roll(10000) { roll(10000) { null } } ");
Event e = new EventImpl("foo".getBytes());
snk.open();
- snk.append(e); // should bork.
+ snk.append(e); // should not bork.
}
@Test
Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java?rev=1208316&r1=1208315&r2=1208316&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java Wed Nov 30 08:01:58 2011
@@ -307,10 +307,13 @@ public class TestRollSink {
try {
roll.open();
roll.append(e1); // append blocks.
- } catch (InterruptedException e) {
+ } catch (RuntimeException eR) {
latch.countDown();
LOG.error("Exited with expected Exception");
return;
+ } catch (InterruptedException e) {
+ latch.countDown();
+ LOG.error("Exited with expected Exception");
} catch (IOException e) {
LOG.info("Got the unexpected IOException exit", e);
e.printStackTrace();
Added: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestSlowSinkRoll.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestSlowSinkRoll.java?rev=1208316&view=auto
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestSlowSinkRoll.java (added)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestSlowSinkRoll.java Wed Nov 30 08:01:58 2011
@@ -0,0 +1,205 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudera.flume.handlers.rolling;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Level;
+import org.codehaus.jettison.json.JSONException;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.cloudera.flume.conf.Context;
+import com.cloudera.flume.conf.FlumeBuilder;
+import com.cloudera.flume.conf.FlumeSpecException;
+import com.cloudera.flume.conf.LogicalNodeContext;
+import com.cloudera.flume.conf.ReportTestingContext;
+import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
+import com.cloudera.flume.conf.SinkFactoryImpl;
+import com.cloudera.flume.core.Attributes;
+import com.cloudera.flume.core.Event;
+import com.cloudera.flume.core.EventImpl;
+import com.cloudera.flume.core.EventSink;
+import com.cloudera.flume.core.EventSource;
+import com.cloudera.flume.core.connector.DirectDriver;
+import com.cloudera.flume.handlers.hdfs.EscapedCustomDfsSink;
+import com.cloudera.flume.reporter.ReportEvent;
+import com.cloudera.flume.reporter.ReportManager;
+import com.cloudera.flume.reporter.ReportTestUtils;
+import com.cloudera.flume.reporter.ReportUtil;
+import com.cloudera.flume.reporter.aggregator.CounterSink;
+import com.cloudera.util.Clock;
+import com.cloudera.util.FileUtil;
+
+public class TestSlowSinkRoll {
+ public static final Logger LOG = LoggerFactory.getLogger(TestSlowSinkRoll.class);
+ public static final String NUM_EVENTS = "num_events";
+ public class DummySource extends EventSource.Base {
+ private static final int DEF_MAX_EVENTS = 10;
+ private final int maxEvents;
+
+ long counter;
+ public DummySource() {
+ maxEvents = DEF_MAX_EVENTS;
+ }
+
+ public DummySource(int maxEv) {
+ maxEvents = maxEv;
+ }
+
+ @Override
+ public Event next() throws InterruptedException {
+ if (counter == maxEvents) {
+ throw new InterruptedException("Max events exceeded");
+ }
+ counter++;
+ LOG.info("Generated event <junk" + counter + ">");
+ return new EventImpl(("junk" + counter + " ").getBytes());
+ }
+
+ @Override
+ public void close() throws InterruptedException {
+ LOG.info("close");
+ }
+
+ @Override
+ public void open() throws RuntimeException {
+ LOG.info("open");
+ }
+
+ public Long getCount() {
+ return counter;
+ }
+ };
+
+ @Before
+ public void setDebug() {
+ // log4j specific debugging level
+ org.apache.log4j.Logger.getRootLogger().setLevel(Level.DEBUG);
+ }
+
+ // the sink has a long delay, increase the roller's timeout and make sure that there
+ // are no events lost
+ @Test
+ public void testLongTimeout() throws IOException, InterruptedException {
+ final File f = FileUtil.mktempdir();
+ Logger rollLog = LoggerFactory.getLogger(RollSink.class);
+
+ RollSink snk = new RollSink(new Context(), "test", 2000, 250) {
+ @Override
+ protected EventSink newSink(Context ctx) throws IOException {
+ return new EscapedCustomDfsSink(ctx, "file:///" + f.getPath(),
+ "sub-%{service}%{rolltag}") {
+ @Override
+ public void append(final Event e) throws IOException, InterruptedException {
+ super.append(e);
+ Clock.sleep(1500);
+ }
+ };
+ }
+
+ @Override
+ synchronized public ReportEvent getMetrics() {
+ // the EvenSink getMetrics doesn't report num events, so use getReport() for now
+ ReportEvent rpt = super.getReport();
+ long cnt = rpt.getLongMetric(EventSink.Base.R_NUM_EVENTS);
+ rpt.setLongMetric(NUM_EVENTS, cnt);
+ return rpt;
+ }
+ };
+ snk.setTimeOut(2000);
+
+ DummySource source = new DummySource(7);
+ DirectDriver driver = new DirectDriver(source, snk);
+ driver.start();
+ Clock.sleep(12000);
+ driver.stop();
+
+ assertEquals(snk.getMetrics().getLongMetric(NUM_EVENTS), source.getCount());
+ }
+
+ // the sink has a long delay, make sure that slow append gets aborted by roller
+ @Test
+ public void testSlowSinkRoll() throws IOException, InterruptedException {
+ final File f = FileUtil.mktempdir();
+
+ RollSink snk = new RollSink(new Context(), "test", 2000, 250) {
+ @Override
+ protected EventSink newSink(Context ctx) throws IOException {
+ return new EscapedCustomDfsSink(ctx, "file:///" + f.getPath(),
+ "sub-%{service}%{rolltag}") {
+ @Override
+ public void append(final Event e) throws IOException, InterruptedException {
+ super.append(e);
+ Clock.sleep(1500);
+ }
+ };
+ }
+ };
+
+ DummySource source = new DummySource(4);
+ DirectDriver driver = new DirectDriver(source, snk);
+ driver.start();
+ Clock.sleep(12200);
+ driver.stop();
+ assertTrue(snk.getMetrics().getLongMetric(RollSink.A_ROLL_ABORTED_APPENDS) > Long.valueOf(0));
+ }
+
+ // the sink has a long delay and roll is configured to wait (timeout is 0 )
+ // make sure that roller waited for appends and there are no aborts
+ @Test
+ public void testWaitingSlowSinkRoll() throws IOException, InterruptedException {
+ final File f = FileUtil.mktempdir();
+
+ RollSink snk = new RollSink(new Context(), "test", 2000, 250) {
+ @Override
+ protected EventSink newSink(Context ctx) throws IOException {
+ return new EscapedCustomDfsSink(ctx, "file:///" + f.getPath(),
+ "sub-%{service}%{rolltag}") {
+ @Override
+ public void append(final Event e) throws IOException, InterruptedException {
+ super.append(e);
+ Clock.sleep(1500);
+ }
+ };
+ }
+ };
+ snk.setTimeOut(0);
+ DummySource source = new DummySource(4);
+ DirectDriver driver = new DirectDriver(source, snk);
+ driver.start();
+ Clock.sleep(12200);
+ driver.stop();
+ assertEquals(snk.getMetrics().getLongMetric(RollSink.A_ROLL_ABORTED_APPENDS), Long.valueOf(0));
+ }
+
+}