You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by pr...@apache.org on 2011/11/30 09:02:00 UTC

svn commit: r1208316 - in /incubator/flume/trunk/flume-core/src: main/java/com/cloudera/flume/conf/ main/java/com/cloudera/flume/handlers/rolling/ test/java/com/cloudera/flume/handlers/rolling/

Author: prasadm
Date: Wed Nov 30 08:01:58 2011
New Revision: 1208316

URL: http://svn.apache.org/viewvc?rev=1208316&view=rev
Log:
Flume-798: Blocked append interrupted by rotation event

Added:
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestSlowSinkRoll.java
Modified:
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollRollTags.java
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java?rev=1208316&r1=1208315&r2=1208316&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java Wed Nov 30 08:01:58 2011
@@ -183,6 +183,7 @@ public class FlumeConfiguration extends 
   public final static String COLLECTOR_EVENT_PORT = "flume.collector.event.port";
   public static final String COLLECTOR_DFS_DIR = "flume.collector.dfs.dir";
   public static final String COLLECTOR_ROLL_MILLIS = "flume.collector.roll.millis";
+  public static final String COLLECTOR_ROLL_TIMEOUT = "flume.collector.roll.timeout";
   public static final String COLLECTOR_OUTPUT_FORMAT = "flume.collector.output.format";
   public static final String COLLECTOR_DFS_COMPRESS_CODEC = "flume.collector.dfs.compress.codec";
 
@@ -641,6 +642,10 @@ public class FlumeConfiguration extends 
     return getLong(COLLECTOR_ROLL_MILLIS, 30000);
   }
 
+  public long getCollectorRollTimeout() {
+    return getLong(COLLECTOR_ROLL_TIMEOUT, 1000);
+  }
+
   /**
    * This is the list of masters that agent nodes will connect to
    */

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java?rev=1208316&r1=1208315&r2=1208316&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/rolling/RollSink.java Wed Nov 30 08:01:58 2011
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import com.cloudera.flume.conf.Context;
 import com.cloudera.flume.conf.FlumeBuilder;
+import com.cloudera.flume.conf.FlumeConfiguration;
 import com.cloudera.flume.conf.FlumeSpecException;
 import com.cloudera.flume.conf.FlumeBuilder.FunctionSpec;
 import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
@@ -68,11 +69,14 @@ public class RollSink extends EventSink.
   private static int threadInitNumber = 0;
   final long checkLatencyMs; // default 4x a second
   private Context ctx; // roll context
+  private long timeOut; // lock wait timeout
+  private boolean forceInterrupt = true;
 
   // reporting attributes and counters
   public final static String A_ROLLS = "rolls";
   public final static String A_ROLLFAILS = "rollfails";
   public final static String A_ROLLSPEC = "rollspec";
+  public final static String A_ROLL_ABORTED_APPENDS = "rollCanceledAppends";
   public final String A_ROLL_TAG; // TODO (jon) parameterize this.
   public final static String DEFAULT_ROLL_TAG = "rolltag";
 
@@ -80,6 +84,7 @@ public class RollSink extends EventSink.
 
   final AtomicLong rolls = new AtomicLong();
   final AtomicLong rollfails = new AtomicLong();
+  final AtomicLong rollCaneledAppends = new AtomicLong();
 
   public RollSink(Context ctx, String spec, long maxAge, long checkMs) {
     this.ctx = ctx;
@@ -87,6 +92,7 @@ public class RollSink extends EventSink.
     this.fspec = spec;
     this.trigger = new TimeTrigger(new ProcessTagger(), maxAge);
     this.checkLatencyMs = checkMs;
+    setTimeOut(FlumeConfiguration.get().getCollectorRollTimeout());
     LOG.info("Created RollSink: maxAge=" + maxAge + "ms trigger=[" + trigger
         + "] checkPeriodMs = " + checkLatencyMs + " spec='" + fspec + "'");
   }
@@ -97,6 +103,7 @@ public class RollSink extends EventSink.
     this.fspec = spec;
     this.trigger = trigger;
     this.checkLatencyMs = checkMs;
+    setTimeOut(FlumeConfiguration.get().getCollectorRollTimeout());
     LOG.info("Created RollSink: trigger=[" + trigger + "] checkPeriodMs = "
         + checkLatencyMs + " spec='" + fspec + "'");
   }
@@ -205,12 +212,10 @@ public class RollSink extends EventSink.
         throw new RuntimeException(e1.getCause());
       }
     } catch (CancellationException ce) {
-      Thread.currentThread().interrupt();
-      throw new InterruptedException(
+      throw new RuntimeException(
           "Blocked append interrupted by rotation event");
     } catch (InterruptedException ex) {
       LOG.warn("Unexpected Exception " + ex.getMessage(), ex);
-      Thread.currentThread().interrupt();
       throw (InterruptedException) ex;
     }
   }
@@ -228,7 +233,18 @@ public class RollSink extends EventSink.
     }
     String tag = trigger.getTagger().getTag();
 
-    e.set(A_ROLL_TAG, tag.getBytes());
+    /* Note that if the directdriver is re-trying this event due to error in
+     * last append, then the event will already have the roll tag
+     * In that case, we want to continue using such event
+     */
+    try {
+      e.set(A_ROLL_TAG, tag.getBytes());
+    } catch (IllegalArgumentException eI) {
+      // if there's a previous rolltag then use it, else rethrow the exception
+      if (e.get(A_ROLL_TAG) == null)
+        throw eI;
+    }
+
     lock.readLock().lock();
     try {
       curSink.append(e);
@@ -264,10 +280,18 @@ public class RollSink extends EventSink.
   }
 
   public boolean rotate() throws InterruptedException {
-    while (!lock.writeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {
+    while (!lock.writeLock().tryLock(timeOut, TimeUnit.MILLISECONDS)) {
       // interrupt the future on the other.
       if (future != null) {
+        if (forceInterrupt == false) {
+          /* If the node is configured not to interrupt an append,
+           * then bail out. The next append or roll will take care
+           * rotating the file.
+           */
+          return false;
+        }
         future.cancel(true);
+        rollCaneledAppends.incrementAndGet();
       }
 
       // NOTE: there is no guarantee that this cancel actually succeeds.
@@ -293,7 +317,7 @@ public class RollSink extends EventSink.
     LOG.info("closing RollSink '" + fspec + "'");
 
     // attempt to get the lock, and if we cannot, issue a cancel
-    while (!lock.writeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {
+    while (!lock.writeLock().tryLock(timeOut, TimeUnit.MILLISECONDS)) {
       // interrupt the future on the other.
       if (future != null) {
         future.cancel(true);
@@ -380,6 +404,7 @@ public class RollSink extends EventSink.
     rpt.setLongMetric(A_ROLLS, rolls.get());
     rpt.setLongMetric(A_ROLLFAILS, rollfails.get());
     rpt.setStringMetric(A_ROLLSPEC, fspec);
+    rpt.setLongMetric(A_ROLL_ABORTED_APPENDS, rollCaneledAppends.get());
     return rpt;
   }
 
@@ -443,6 +468,13 @@ public class RollSink extends EventSink.
     return rt;
   }
 
+  public void setTimeOut (long timeout) {
+    this.timeOut = timeout;
+    if (timeout == 0) {
+      forceInterrupt = false;
+    }
+  }
+
   /**
    * Builder for a spec based rolling sink. (most general version, does not
    * necessarily output to files!).

Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollRollTags.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollRollTags.java?rev=1208316&r1=1208315&r2=1208316&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollRollTags.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollRollTags.java Wed Nov 30 08:01:58 2011
@@ -67,23 +67,23 @@ public class TestRollRollTags {
     assertEquals("second", Attributes.readString(e2, "duped"));
   }
 
-  @Test(expected = IllegalArgumentException.class)
+  @Test
   public void testRollRollConflict() throws IOException, FlumeSpecException,
       InterruptedException {
     EventSink snk = new CompositeSink(new Context(),
         "{value(\"rolltag\",\"foofoo\") =>   roll(10000) {null} } ");
     Event e = new EventImpl("foo".getBytes());
     snk.open();
-    snk.append(e); // should bork.
+    snk.append(e); // should not bork.
   }
 
-  @Test(expected = IllegalArgumentException.class)
+  @Test
   public void testRollRollBork() throws IOException, FlumeSpecException, InterruptedException {
     EventSink snk = new CompositeSink(new Context(),
         "roll(10000) { roll(10000) { null } } ");
     Event e = new EventImpl("foo".getBytes());
     snk.open();
-    snk.append(e); // should bork.
+    snk.append(e); // should not bork.
   }
 
   @Test

Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java?rev=1208316&r1=1208315&r2=1208316&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestRollSink.java Wed Nov 30 08:01:58 2011
@@ -307,10 +307,13 @@ public class TestRollSink {
         try {
           roll.open();
           roll.append(e1); // append blocks.
-        } catch (InterruptedException e) {
+        } catch (RuntimeException eR) {
           latch.countDown();
           LOG.error("Exited with expected Exception");
           return;
+        } catch (InterruptedException e) {
+          latch.countDown();
+          LOG.error("Exited with expected Exception");
         } catch (IOException e) {
           LOG.info("Got the unexpected IOException exit", e);
           e.printStackTrace();

Added: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestSlowSinkRoll.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestSlowSinkRoll.java?rev=1208316&view=auto
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestSlowSinkRoll.java (added)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/rolling/TestSlowSinkRoll.java Wed Nov 30 08:01:58 2011
@@ -0,0 +1,205 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudera.flume.handlers.rolling;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Level;
+import org.codehaus.jettison.json.JSONException;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.cloudera.flume.conf.Context;
+import com.cloudera.flume.conf.FlumeBuilder;
+import com.cloudera.flume.conf.FlumeSpecException;
+import com.cloudera.flume.conf.LogicalNodeContext;
+import com.cloudera.flume.conf.ReportTestingContext;
+import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
+import com.cloudera.flume.conf.SinkFactoryImpl;
+import com.cloudera.flume.core.Attributes;
+import com.cloudera.flume.core.Event;
+import com.cloudera.flume.core.EventImpl;
+import com.cloudera.flume.core.EventSink;
+import com.cloudera.flume.core.EventSource;
+import com.cloudera.flume.core.connector.DirectDriver;
+import com.cloudera.flume.handlers.hdfs.EscapedCustomDfsSink;
+import com.cloudera.flume.reporter.ReportEvent;
+import com.cloudera.flume.reporter.ReportManager;
+import com.cloudera.flume.reporter.ReportTestUtils;
+import com.cloudera.flume.reporter.ReportUtil;
+import com.cloudera.flume.reporter.aggregator.CounterSink;
+import com.cloudera.util.Clock;
+import com.cloudera.util.FileUtil;
+
+public class TestSlowSinkRoll {
+  public static final Logger LOG = LoggerFactory.getLogger(TestSlowSinkRoll.class);
+  public static final String NUM_EVENTS = "num_events";
+  public class DummySource extends EventSource.Base {
+    private static final int DEF_MAX_EVENTS = 10;
+    private final int maxEvents;
+
+    long counter;
+    public DummySource() {
+      maxEvents = DEF_MAX_EVENTS;
+    }
+
+    public DummySource(int maxEv) {
+      maxEvents = maxEv;
+    }
+
+    @Override
+    public Event next() throws InterruptedException {
+      if (counter == maxEvents) {
+        throw new InterruptedException("Max events exceeded");
+      }
+      counter++;
+      LOG.info("Generated event <junk" + counter + ">");
+      return new EventImpl(("junk" + counter + " ").getBytes());
+    }
+
+    @Override
+    public void close() throws InterruptedException {
+      LOG.info("close");
+    }
+
+    @Override
+    public void open() throws RuntimeException {
+      LOG.info("open");
+    }
+
+    public Long getCount() {
+      return counter;
+    }
+  };
+
+  @Before
+  public void setDebug() {
+    // log4j specific debugging level
+    org.apache.log4j.Logger.getRootLogger().setLevel(Level.DEBUG);
+  }
+
+  // the sink has a long delay, increase the roller's timeout and make sure that there
+  // are no events lost
+  @Test
+  public void testLongTimeout() throws IOException, InterruptedException {
+    final File f = FileUtil.mktempdir();
+    Logger rollLog = LoggerFactory.getLogger(RollSink.class);
+
+    RollSink snk = new RollSink(new Context(), "test", 2000, 250) {
+      @Override
+      protected EventSink newSink(Context ctx) throws IOException {
+        return new EscapedCustomDfsSink(ctx, "file:///" + f.getPath(),
+            "sub-%{service}%{rolltag}") {
+          @Override
+          public void append(final Event e) throws IOException, InterruptedException {
+            super.append(e);
+            Clock.sleep(1500);
+          }
+        };
+      }
+
+      @Override
+      synchronized public ReportEvent getMetrics() {
+        // the EvenSink getMetrics doesn't report num events, so use getReport() for now
+        ReportEvent rpt = super.getReport();
+        long cnt = rpt.getLongMetric(EventSink.Base.R_NUM_EVENTS);
+        rpt.setLongMetric(NUM_EVENTS, cnt);
+        return rpt;
+      }
+    };
+    snk.setTimeOut(2000);
+
+    DummySource source = new DummySource(7);
+    DirectDriver driver = new DirectDriver(source, snk);
+    driver.start();
+    Clock.sleep(12000);
+    driver.stop();
+
+    assertEquals(snk.getMetrics().getLongMetric(NUM_EVENTS), source.getCount());
+  }
+
+  // the sink has a long delay, make sure that slow append gets aborted by roller
+  @Test
+  public void testSlowSinkRoll() throws IOException, InterruptedException {
+    final File f = FileUtil.mktempdir();
+
+    RollSink snk = new RollSink(new Context(), "test", 2000, 250) {
+      @Override
+      protected EventSink newSink(Context ctx) throws IOException {
+        return new EscapedCustomDfsSink(ctx, "file:///" + f.getPath(),
+            "sub-%{service}%{rolltag}") {
+          @Override
+          public void append(final Event e) throws IOException, InterruptedException {
+            super.append(e);
+            Clock.sleep(1500);
+          }
+        };
+      }
+    };
+
+    DummySource source = new DummySource(4);
+    DirectDriver driver = new DirectDriver(source, snk);
+    driver.start();
+    Clock.sleep(12200);
+    driver.stop();
+    assertTrue(snk.getMetrics().getLongMetric(RollSink.A_ROLL_ABORTED_APPENDS) > Long.valueOf(0));
+  }
+
+  // the sink has a long delay and roll is configured to wait (timeout is 0 )
+  // make sure that roller waited for appends and there are no aborts
+  @Test
+  public void testWaitingSlowSinkRoll() throws IOException, InterruptedException {
+    final File f = FileUtil.mktempdir();
+
+    RollSink snk = new RollSink(new Context(), "test", 2000, 250) {
+      @Override
+      protected EventSink newSink(Context ctx) throws IOException {
+        return new EscapedCustomDfsSink(ctx, "file:///" + f.getPath(),
+            "sub-%{service}%{rolltag}") {
+          @Override
+          public void append(final Event e) throws IOException, InterruptedException {
+            super.append(e);
+            Clock.sleep(1500);
+          }
+        };
+      }
+    };
+    snk.setTimeOut(0);
+    DummySource source = new DummySource(4);
+    DirectDriver driver = new DirectDriver(source, snk);
+    driver.start();
+    Clock.sleep(12200);
+    driver.stop();
+    assertEquals(snk.getMetrics().getLongMetric(RollSink.A_ROLL_ABORTED_APPENDS), Long.valueOf(0));
+  }
+
+}