You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/08/10 06:50:25 UTC
git commit: FLUME-2157. Spool directory source does not shut down
correctly when Flume is reconfigured.
Updated Branches:
refs/heads/trunk 99db32ccd -> f9da62be2
FLUME-2157. Spool directory source does not shut down correctly when Flume is reconfigured.
(Mike Percy via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f9da62be
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f9da62be
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f9da62be
Branch: refs/heads/trunk
Commit: f9da62be22101b00b907be3a66a6ce8d823c2f8f
Parents: 99db32c
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Aug 9 21:49:19 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Aug 9 21:49:19 2013 -0700
----------------------------------------------------------------------
.../avro/ReliableSpoolingFileEventReader.java | 13 +-
.../flume/source/SpoolDirectorySource.java | 41 ++++-
.../flume/source/TestSpoolDirectorySource.java | 31 ++++
.../flume/test/agent/TestSpooldirSource.java | 174 +++++++++++++++++++
.../apache/flume/test/util/StagedInstall.java | 30 +++-
5 files changed, 271 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/f9da62be/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index f82fe1f..724ab38 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -133,12 +133,15 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
// Do a canary test to make sure we have access to spooling directory
try {
- File f1 = File.createTempFile("flume", "test", spoolDirectory);
- Files.write("testing flume file permissions\n", f1, Charsets.UTF_8);
- Files.readLines(f1, Charsets.UTF_8);
- if (!f1.delete()) {
- throw new IOException("Unable to delete canary file " + f1);
+ File canary = File.createTempFile("flume-spooldir-perm-check-", ".canary",
+ spoolDirectory);
+ Files.write("testing flume file permissions\n", canary, Charsets.UTF_8);
+ List<String> lines = Files.readLines(canary, Charsets.UTF_8);
+ Preconditions.checkState(!lines.isEmpty(), "Empty canary file %s", canary);
+ if (!canary.delete()) {
+ throw new IOException("Unable to delete canary file " + canary);
}
+ logger.debug("Successfully created and deleted canary file: {}", canary);
} catch (IOException e) {
throw new FlumeException("Unable to read and modify files" +
" in the spooling directory: " + spoolDirectory, e);
http://git-wip-us.apache.org/repos/asf/flume/blob/f9da62be/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 7145580..957eb8b 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -24,6 +24,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
import org.apache.flume.*;
import org.apache.flume.client.avro.ReliableSpoolingFileEventReader;
import org.apache.flume.conf.Configurable;
@@ -58,17 +60,18 @@ Configurable, EventDrivenSource {
private Context deserializerContext;
private String deletePolicy;
private String inputCharset;
+ private volatile boolean hasFatalError = false;
private SourceCounter sourceCounter;
ReliableSpoolingFileEventReader reader;
+ private ScheduledExecutorService executor;
@Override
- public void start() {
+ public synchronized void start() {
logger.info("SpoolDirectorySource source starting with directory: {}",
spoolDirectory);
- ScheduledExecutorService executor =
- Executors.newSingleThreadScheduledExecutor();
+ executor = Executors.newSingleThreadScheduledExecutor();
File directory = new File(spoolDirectory);
try {
@@ -99,7 +102,15 @@ Configurable, EventDrivenSource {
}
@Override
- public void stop() {
+ public synchronized void stop() {
+ executor.shutdown();
+ try {
+ executor.awaitTermination(10L, TimeUnit.SECONDS);
+ } catch (InterruptedException ex) {
+ logger.info("Interrupted while awaiting termination", ex);
+ }
+ executor.shutdownNow();
+
super.stop();
sourceCounter.stop();
logger.info("SpoolDir source {} stopped. Metrics: {}", getName(),
@@ -107,7 +118,13 @@ Configurable, EventDrivenSource {
}
@Override
- public void configure(Context context) {
+ public String toString() {
+ return "Spool Directory source " + getName() +
+ ": { spoolDir: " + spoolDirectory + " }";
+ }
+
+ @Override
+ public synchronized void configure(Context context) {
spoolDirectory = context.getString(SPOOL_DIRECTORY);
Preconditions.checkState(spoolDirectory != null,
"Configuration must specify a spooling directory");
@@ -143,6 +160,11 @@ Configurable, EventDrivenSource {
}
}
+ @VisibleForTesting
+ protected boolean hasFatalError() {
+ return hasFatalError;
+ }
+
private class SpoolDirectoryRunnable implements Runnable {
private ReliableSpoolingFileEventReader reader;
private SourceCounter sourceCounter;
@@ -170,10 +192,11 @@ Configurable, EventDrivenSource {
sourceCounter.incrementAppendBatchAcceptedCount();
}
} catch (Throwable t) {
- logger.error("Uncaught exception in Runnable", t);
- if (t instanceof Error) {
- throw (Error) t;
- }
+ logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
+ "Uncaught exception in SpoolDirectorySource thread. " +
+ "Restart or reconfigure Flume to continue processing.", t);
+ hasFatalError = true;
+ Throwables.propagate(t);
}
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/f9da62be/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
index 652d2a2..837cf15 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
@@ -21,8 +21,10 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -132,4 +134,33 @@ public class TestSpoolDirectorySource {
source.getLifecycleState());
}
}
+
+ @Test
+ public void testReconfigure() throws InterruptedException, IOException {
+ final int NUM_RECONFIGS = 20;
+ for (int i = 0; i < NUM_RECONFIGS; i++) {
+ Context context = new Context();
+ File file = new File(tmpDir.getAbsolutePath() + "/file-" + i);
+ Files.write("File " + i, file, Charsets.UTF_8);
+ context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
+ tmpDir.getAbsolutePath());
+ Configurables.configure(source, context);
+ source.start();
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1));
+ Transaction txn = channel.getTransaction();
+ txn.begin();
+ try {
+ Event event = channel.take();
+ String content = new String(event.getBody(), Charsets.UTF_8);
+ Assert.assertEquals("File " + i, content);
+ txn.commit();
+ } catch (Throwable t) {
+ txn.rollback();
+ } finally {
+ txn.close();
+ }
+ source.stop();
+ Assert.assertFalse("Fatal error on iteration " + i, source.hasFatalError());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/f9da62be/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSpooldirSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSpooldirSource.java b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSpooldirSource.java
new file mode 100644
index 0000000..6018380
--- /dev/null
+++ b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestSpooldirSource.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.test.agent;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.test.util.StagedInstall;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestSpooldirSource {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(TestSpooldirSource.class);
+
+ private Properties agentProps;
+ private File sinkOutputDir;
+ private List<File> spoolDirs = Lists.newArrayList();
+
+ @Before
+ public void setup() throws Exception {
+
+ File agentDir = StagedInstall.getInstance().getStageDir();
+ LOGGER.debug("Using agent stage dir: {}", agentDir);
+
+ File testDir = new File(agentDir, TestSpooldirSource.class.getName());
+ assertTrue(testDir.mkdirs());
+
+ File spoolParentDir = new File(testDir, "spools");
+ assertTrue("Unable to create sink output dir: " + spoolParentDir.getPath(),
+ spoolParentDir.mkdir());
+
+ final int NUM_SOURCES = 100;
+
+ agentProps = new Properties();
+ List<String> spooldirSrcNames = Lists.newArrayList();
+ String channelName = "mem-01";
+
+ // Create source dirs and property file chunks
+ for (int i = 0; i < NUM_SOURCES; i++) {
+ String srcName = String.format("spooldir-%03d", i);
+ File spoolDir = new File(spoolParentDir, srcName);
+ assertTrue(spoolDir.mkdir());
+ spooldirSrcNames.add(srcName);
+ spoolDirs.add(spoolDir);
+
+ agentProps.put(String.format("agent.sources.%s.type", srcName),
+ "SPOOLDIR");
+ agentProps.put(String.format("agent.sources.%s.spoolDir", srcName),
+ spoolDir.getPath());
+ agentProps.put(String.format("agent.sources.%s.channels", srcName),
+ channelName);
+ }
+
+ // Create the rest of the properties file
+ agentProps.put("agent.channels.mem-01.type", "MEMORY");
+ agentProps.put("agent.channels.mem-01.capacity", String.valueOf(100000));
+
+ sinkOutputDir = new File(testDir, "out");
+ assertTrue("Unable to create sink output dir: " + sinkOutputDir.getPath(),
+ sinkOutputDir.mkdir());
+
+ agentProps.put("agent.sinks.roll-01.channel", channelName);
+ agentProps.put("agent.sinks.roll-01.type", "FILE_ROLL");
+ agentProps.put("agent.sinks.roll-01.sink.directory", sinkOutputDir.getPath());
+ agentProps.put("agent.sinks.roll-01.sink.rollInterval", "0");
+
+ agentProps.put("agent.sources", Joiner.on(" ").join(spooldirSrcNames));
+ agentProps.put("agent.channels", channelName);
+ agentProps.put("agent.sinks", "roll-01");
+ }
+
+ @After
+ public void teardown() throws Exception {
+ StagedInstall.getInstance().stopAgent();
+ }
+
+ private String getTestString(int dirNum, int fileNum) {
+ return String.format("Test dir %03d, test file %03d.\n", dirNum, fileNum);
+ }
+
+ /** Create a bunch of test files. */
+ private void createInputTestFiles(List<File> spoolDirs, int numFiles, int startNum)
+ throws IOException {
+ int numSpoolDirs = spoolDirs.size();
+ for (int dirNum = 0; dirNum < numSpoolDirs; dirNum++) {
+ File spoolDir = spoolDirs.get(dirNum);
+ for (int fileNum = startNum; fileNum < numFiles; fileNum++) {
+ // Stage the files on what is almost certainly the same FS partition.
+ File tmp = new File(spoolDir.getParent(), UUID.randomUUID().toString());
+ Files.append(getTestString(dirNum, fileNum), tmp, Charsets.UTF_8);
+ File dst = new File(spoolDir, String.format("test-file-%03d", fileNum));
+ // Ensure we move them into the spool directory atomically, if possible.
+ assertTrue(String.format("Failed to rename %s to %s", tmp, dst),
+ tmp.renameTo(dst));
+ }
+ }
+ }
+
+ private void validateSeenEvents(File outDir, int outFiles, int dirs, int events)
+ throws IOException {
+ File[] sinkOutputDirChildren = outDir.listFiles();
+ assertEquals("Unexpected number of files in output dir",
+ outFiles, sinkOutputDirChildren.length);
+ Set<String> seenEvents = Sets.newHashSet();
+ for (File outFile : sinkOutputDirChildren) {
+ List<String> lines = Files.readLines(outFile, Charsets.UTF_8);
+ for (String line : lines) {
+ seenEvents.add(line);
+ }
+ }
+ for (int dirNum = 0; dirNum < dirs; dirNum++) {
+ for (int fileNum = 0; fileNum < events; fileNum++) {
+ String event = getTestString(dirNum, fileNum).trim();
+ assertTrue("Missing event: {" + event + "}", seenEvents.contains(event));
+ }
+ }
+ }
+
+ @Test
+ public void testManySpooldirs() throws Exception {
+ LOGGER.debug("testManySpooldirs() started.");
+
+ StagedInstall.getInstance().startAgent("agent", agentProps);
+
+ final int NUM_FILES_PER_DIR = 10;
+ createInputTestFiles(spoolDirs, NUM_FILES_PER_DIR, 0);
+
+ TimeUnit.SECONDS.sleep(10); // Wait for sources and sink to process files
+
+ // Ensure we received all events.
+ validateSeenEvents(sinkOutputDir,1, spoolDirs.size(), NUM_FILES_PER_DIR);
+ LOGGER.debug("Processed all the events!");
+
+ LOGGER.debug("testManySpooldirs() ended.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/f9da62be/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
----------------------------------------------------------------------
diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
index bc58340..336ffc4 100644
--- a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
+++ b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
@@ -29,8 +29,11 @@ import java.net.Socket;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.log4j.Logger;
@@ -62,6 +65,7 @@ public class StagedInstall {
private final String logDirPath;
// State per invocation - config file, process, shutdown hook
+ private String agentName;
private String configFilePath;
private Process process;
private ProcessShutdownHook shutdownHook;
@@ -113,17 +117,22 @@ public class StagedInstall {
public synchronized void startAgent(String name, Properties properties)
throws Exception {
+ Preconditions.checkArgument(!name.isEmpty(), "agent name must not be empty");
+ Preconditions.checkNotNull(properties, "properties object must not be null");
+
+ agentName = name;
+
if (process != null) {
throw new Exception("A process is already running");
}
- LOGGER.info("Starting process for agent: " + name + " using config: "
+ LOGGER.info("Starting process for agent: " + agentName + " using config: "
+ properties);
- File configFile = createConfigurationFile(name, properties);
+ File configFile = createConfigurationFile(agentName, properties);
configFilePath = configFile.getCanonicalPath();
String configFileName = configFile.getName();
- String logFileName = "flume-" + name + "-"
+ String logFileName = "flume-" + agentName + "-"
+ configFileName.substring(0, configFileName.indexOf('.')) + ".log";
LOGGER.info("Created configuration file: " + configFilePath);
@@ -136,7 +145,7 @@ public class StagedInstall {
builder.add("--classpath", agentClasspath);
}
builder.add("--conf-file", configFilePath);
- builder.add("--name", name);
+ builder.add("--name", agentName);
builder.add("-D" + ENV_FLUME_LOG_DIR + "=" + logDirPath);
builder.add("-D" + ENV_FLUME_ROOT_LOGGER + "="
+ ENV_FLUME_ROOT_LOGGER_VALUE);
@@ -168,8 +177,21 @@ public class StagedInstall {
this.agentClasspath = agentClasspath;
}
+ public synchronized void reconfigure(Properties properties) throws Exception {
+ File configFile = createConfigurationFile(agentName, properties);
+ Files.copy(configFile, new File(configFilePath));
+ configFile.delete();
+ LOGGER.info("Updated agent config file: " + configFilePath);
+ }
+
+ public synchronized File getStageDir() {
+ return stageDir;
+ }
+
private File createConfigurationFile(String agentName, Properties properties)
throws Exception {
+ Preconditions.checkNotNull(properties, "properties object must not be null");
+
File file = File.createTempFile("agent", "config.properties", stageDir);
OutputStream os = null;