You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/08/10 06:50:25 UTC

git commit: FLUME-2157. Spool directory source does not shut down correctly when Flume is reconfigured.

Updated Branches:
  refs/heads/trunk 99db32ccd -> f9da62be2


FLUME-2157. Spool directory source does not shut down correctly when Flume is reconfigured.

(Mike Percy via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f9da62be
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f9da62be
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f9da62be

Branch: refs/heads/trunk
Commit: f9da62be22101b00b907be3a66a6ce8d823c2f8f
Parents: 99db32c
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Aug 9 21:49:19 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Aug 9 21:49:19 2013 -0700

----------------------------------------------------------------------
 .../avro/ReliableSpoolingFileEventReader.java   |  13 +-
 .../flume/source/SpoolDirectorySource.java      |  41 ++++-
 .../flume/source/TestSpoolDirectorySource.java  |  31 ++++
 .../flume/test/agent/TestSpooldirSource.java    | 174 +++++++++++++++++++
 .../apache/flume/test/util/StagedInstall.java   |  30 +++-
 5 files changed, 271 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/f9da62be/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index f82fe1f..724ab38 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -133,12 +133,15 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
 
     // Do a canary test to make sure we have access to spooling directory
     try {
-      File f1 = File.createTempFile("flume", "test", spoolDirectory);
-      Files.write("testing flume file permissions\n", f1, Charsets.UTF_8);
-      Files.readLines(f1, Charsets.UTF_8);
-      if (!f1.delete()) {
-        throw new IOException("Unable to delete canary file " + f1);
+      File canary = File.createTempFile("flume-spooldir-perm-check-", ".canary",
+          spoolDirectory);
+      Files.write("testing flume file permissions\n", canary, Charsets.UTF_8);
+      List<String> lines = Files.readLines(canary, Charsets.UTF_8);
+      Preconditions.checkState(!lines.isEmpty(), "Empty canary file %s", canary);
+      if (!canary.delete()) {
+        throw new IOException("Unable to delete canary file " + canary);
       }
+      logger.debug("Successfully created and deleted canary file: {}", canary);
     } catch (IOException e) {
       throw new FlumeException("Unable to read and modify files" +
           " in the spooling directory: " + spoolDirectory, e);

http://git-wip-us.apache.org/repos/asf/flume/blob/f9da62be/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 7145580..957eb8b 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -24,6 +24,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
 import org.apache.flume.*;
 import org.apache.flume.client.avro.ReliableSpoolingFileEventReader;
 import org.apache.flume.conf.Configurable;
@@ -58,17 +60,18 @@ Configurable, EventDrivenSource {
   private Context deserializerContext;
   private String deletePolicy;
   private String inputCharset;
+  private volatile boolean hasFatalError = false;
 
   private SourceCounter sourceCounter;
   ReliableSpoolingFileEventReader reader;
+  private ScheduledExecutorService executor;
 
   @Override
-  public void start() {
+  public synchronized void start() {
     logger.info("SpoolDirectorySource source starting with directory: {}",
         spoolDirectory);
 
-    ScheduledExecutorService executor =
-        Executors.newSingleThreadScheduledExecutor();
+    executor = Executors.newSingleThreadScheduledExecutor();
 
     File directory = new File(spoolDirectory);
     try {
@@ -99,7 +102,15 @@ Configurable, EventDrivenSource {
   }
 
   @Override
-  public void stop() {
+  public synchronized void stop() {
+    executor.shutdown();
+    try {
+      executor.awaitTermination(10L, TimeUnit.SECONDS);
+    } catch (InterruptedException ex) {
+      logger.info("Interrupted while awaiting termination", ex);
+    }
+    executor.shutdownNow();
+
     super.stop();
     sourceCounter.stop();
     logger.info("SpoolDir source {} stopped. Metrics: {}", getName(),
@@ -107,7 +118,13 @@ Configurable, EventDrivenSource {
   }
 
   @Override
-  public void configure(Context context) {
+  public String toString() {
+    return "Spool Directory source " + getName() +
+        ": { spoolDir: " + spoolDirectory + " }";
+  }
+
+  @Override
+  public synchronized void configure(Context context) {
     spoolDirectory = context.getString(SPOOL_DIRECTORY);
     Preconditions.checkState(spoolDirectory != null,
         "Configuration must specify a spooling directory");
@@ -143,6 +160,11 @@ Configurable, EventDrivenSource {
     }
   }
 
+  @VisibleForTesting
+  protected boolean hasFatalError() {
+    return hasFatalError;
+  }
+
   private class SpoolDirectoryRunnable implements Runnable {
     private ReliableSpoolingFileEventReader reader;
     private SourceCounter sourceCounter;
@@ -170,10 +192,11 @@ Configurable, EventDrivenSource {
           sourceCounter.incrementAppendBatchAcceptedCount();
         }
       } catch (Throwable t) {
-        logger.error("Uncaught exception in Runnable", t);
-        if (t instanceof Error) {
-          throw (Error) t;
-        }
+        logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
+            "Uncaught exception in SpoolDirectorySource thread. " +
+            "Restart or reconfigure Flume to continue processing.", t);
+        hasFatalError = true;
+        Throwables.propagate(t);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/f9da62be/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
index 652d2a2..837cf15 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
@@ -21,8 +21,10 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -132,4 +134,33 @@ public class TestSpoolDirectorySource {
           source.getLifecycleState());
     }
   }
+
+  @Test
+  public void testReconfigure() throws InterruptedException, IOException {
+    final int NUM_RECONFIGS = 20;
+    for (int i = 0; i < NUM_RECONFIGS; i++) {
+      Context context = new Context();
+      File file = new File(tmpDir.getAbsolutePath() + "/file-" + i);
+      Files.write("File " + i, file, Charsets.UTF_8);
+      context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
+          tmpDir.getAbsolutePath());
+      Configurables.configure(source, context);
+      source.start();
+      Thread.sleep(TimeUnit.SECONDS.toMillis(1));
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+      try {
+        Event event = channel.take();
+        String content = new String(event.getBody(), Charsets.UTF_8);
+        Assert.assertEquals("File " + i, content);
+        txn.commit();
+      } catch (Throwable t) {
+        txn.rollback();
+      } finally {
+        txn.close();
+      }
+      source.stop();
+      Assert.assertFalse("Fatal error on iteration " + i, source.hasFatalError());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/f9da62be/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSpooldirSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSpooldirSource.java b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSpooldirSource.java
new file mode 100644
index 0000000..6018380
--- /dev/null
+++ b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSpooldirSource.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.test.agent;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.test.util.StagedInstall;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestSpooldirSource {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(TestSpooldirSource.class);
+
+  private Properties agentProps;
+  private File sinkOutputDir;
+  private List<File> spoolDirs = Lists.newArrayList();
+
+  @Before
+  public void setup() throws Exception {
+
+    File agentDir = StagedInstall.getInstance().getStageDir();
+    LOGGER.debug("Using agent stage dir: {}", agentDir);
+
+    File testDir = new File(agentDir, TestSpooldirSource.class.getName());
+    assertTrue(testDir.mkdirs());
+
+    File spoolParentDir = new File(testDir, "spools");
+    assertTrue("Unable to create sink output dir: " + spoolParentDir.getPath(),
+        spoolParentDir.mkdir());
+
+    final int NUM_SOURCES = 100;
+
+    agentProps = new Properties();
+    List<String> spooldirSrcNames = Lists.newArrayList();
+    String channelName = "mem-01";
+
+    // Create source dirs and property file chunks
+    for (int i = 0; i < NUM_SOURCES; i++) {
+      String srcName = String.format("spooldir-%03d", i);
+      File spoolDir = new File(spoolParentDir, srcName);
+      assertTrue(spoolDir.mkdir());
+      spooldirSrcNames.add(srcName);
+      spoolDirs.add(spoolDir);
+
+      agentProps.put(String.format("agent.sources.%s.type", srcName),
+          "SPOOLDIR");
+      agentProps.put(String.format("agent.sources.%s.spoolDir", srcName),
+          spoolDir.getPath());
+      agentProps.put(String.format("agent.sources.%s.channels", srcName),
+          channelName);
+    }
+
+    // Create the rest of the properties file
+    agentProps.put("agent.channels.mem-01.type", "MEMORY");
+    agentProps.put("agent.channels.mem-01.capacity", String.valueOf(100000));
+
+    sinkOutputDir = new File(testDir, "out");
+    assertTrue("Unable to create sink output dir: " + sinkOutputDir.getPath(),
+        sinkOutputDir.mkdir());
+
+    agentProps.put("agent.sinks.roll-01.channel", channelName);
+    agentProps.put("agent.sinks.roll-01.type", "FILE_ROLL");
+    agentProps.put("agent.sinks.roll-01.sink.directory", sinkOutputDir.getPath());
+    agentProps.put("agent.sinks.roll-01.sink.rollInterval", "0");
+
+    agentProps.put("agent.sources", Joiner.on(" ").join(spooldirSrcNames));
+    agentProps.put("agent.channels", channelName);
+    agentProps.put("agent.sinks", "roll-01");
+  }
+
+  @After
+  public void teardown() throws Exception {
+    StagedInstall.getInstance().stopAgent();
+  }
+
+  private String getTestString(int dirNum, int fileNum) {
+    return String.format("Test dir %03d, test file %03d.\n", dirNum, fileNum);
+  }
+
+  /** Create a bunch of test files. */
+  private void createInputTestFiles(List<File> spoolDirs, int numFiles, int startNum)
+      throws IOException {
+    int numSpoolDirs = spoolDirs.size();
+    for (int dirNum = 0; dirNum < numSpoolDirs; dirNum++) {
+      File spoolDir = spoolDirs.get(dirNum);
+      for (int fileNum = startNum; fileNum < numFiles; fileNum++) {
+        // Stage the files on what is almost certainly the same FS partition.
+        File tmp = new File(spoolDir.getParent(), UUID.randomUUID().toString());
+        Files.append(getTestString(dirNum, fileNum), tmp, Charsets.UTF_8);
+        File dst = new File(spoolDir, String.format("test-file-%03d", fileNum));
+        // Ensure we move them into the spool directory atomically, if possible.
+        assertTrue(String.format("Failed to rename %s to %s", tmp, dst),
+            tmp.renameTo(dst));
+      }
+    }
+  }
+
+  private void validateSeenEvents(File outDir, int outFiles, int dirs, int events)
+      throws IOException {
+    File[] sinkOutputDirChildren = outDir.listFiles();
+    assertEquals("Unexpected number of files in output dir",
+        outFiles, sinkOutputDirChildren.length);
+    Set<String> seenEvents = Sets.newHashSet();
+    for (File outFile : sinkOutputDirChildren) {
+      List<String> lines = Files.readLines(outFile, Charsets.UTF_8);
+      for (String line : lines) {
+        seenEvents.add(line);
+      }
+    }
+    for (int dirNum = 0; dirNum < dirs; dirNum++) {
+      for (int fileNum = 0; fileNum < events; fileNum++) {
+        String event = getTestString(dirNum, fileNum).trim();
+        assertTrue("Missing event: {" + event + "}", seenEvents.contains(event));
+      }
+    }
+  }
+
+  @Test
+  public void testManySpooldirs() throws Exception {
+    LOGGER.debug("testManySpooldirs() started.");
+
+    StagedInstall.getInstance().startAgent("agent", agentProps);
+
+    final int NUM_FILES_PER_DIR = 10;
+    createInputTestFiles(spoolDirs, NUM_FILES_PER_DIR, 0);
+
+    TimeUnit.SECONDS.sleep(10); // Wait for sources and sink to process files
+
+    // Ensure we received all events.
+    validateSeenEvents(sinkOutputDir,1, spoolDirs.size(), NUM_FILES_PER_DIR);
+    LOGGER.debug("Processed all the events!");
+
+    LOGGER.debug("testManySpooldirs() ended.");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/f9da62be/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
----------------------------------------------------------------------
diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
index bc58340..336ffc4 100644
--- a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
+++ b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
@@ -29,8 +29,11 @@ import java.net.Socket;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.GZIPInputStream;
 
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.log4j.Logger;
@@ -62,6 +65,7 @@ public class StagedInstall {
   private final String logDirPath;
 
   // State per invocation - config file, process, shutdown hook
+  private String agentName;
   private String configFilePath;
   private Process process;
   private ProcessShutdownHook shutdownHook;
@@ -113,17 +117,22 @@ public class StagedInstall {
 
   public synchronized void startAgent(String name, Properties properties)
       throws Exception {
+    Preconditions.checkArgument(!name.isEmpty(), "agent name must not be empty");
+    Preconditions.checkNotNull(properties, "properties object must not be null");
+
+    agentName = name;
+
     if (process != null) {
       throw new Exception("A process is already running");
     }
-    LOGGER.info("Starting process for agent: " + name + " using config: "
+    LOGGER.info("Starting process for agent: " + agentName + " using config: "
        + properties);
 
-    File configFile = createConfigurationFile(name, properties);
+    File configFile = createConfigurationFile(agentName, properties);
     configFilePath = configFile.getCanonicalPath();
 
     String configFileName = configFile.getName();
-    String logFileName = "flume-" + name + "-"
+    String logFileName = "flume-" + agentName + "-"
         + configFileName.substring(0, configFileName.indexOf('.')) + ".log";
 
     LOGGER.info("Created configuration file: " + configFilePath);
@@ -136,7 +145,7 @@ public class StagedInstall {
         builder.add("--classpath", agentClasspath);
     }
     builder.add("--conf-file", configFilePath);
-    builder.add("--name", name);
+    builder.add("--name", agentName);
     builder.add("-D" + ENV_FLUME_LOG_DIR + "=" + logDirPath);
     builder.add("-D" + ENV_FLUME_ROOT_LOGGER + "="
             + ENV_FLUME_ROOT_LOGGER_VALUE);
@@ -168,8 +177,21 @@ public class StagedInstall {
       this.agentClasspath = agentClasspath;
   }
 
+  public synchronized void reconfigure(Properties properties) throws Exception {
+    File configFile = createConfigurationFile(agentName, properties);
+    Files.copy(configFile, new File(configFilePath));
+    configFile.delete();
+    LOGGER.info("Updated agent config file: " + configFilePath);
+  }
+
+  public synchronized File getStageDir() {
+    return stageDir;
+  }
+
   private File createConfigurationFile(String agentName, Properties properties)
       throws Exception {
+    Preconditions.checkNotNull(properties, "properties object must not be null");
+
     File file = File.createTempFile("agent", "config.properties", stageDir);
 
     OutputStream os = null;