You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/01/25 22:09:40 UTC
git commit: FLUME-1800: Docs for spooling source durability changes
Updated Branches:
refs/heads/trunk c1e3d5041 -> 001d4860e
FLUME-1800: Docs for spooling source durability changes
(Mike Percy via Brock Noland)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/001d4860
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/001d4860
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/001d4860
Branch: refs/heads/trunk
Commit: 001d4860e16e690d8135f63bf864200c24b04ee8
Parents: c1e3d50
Author: Brock Noland <br...@apache.org>
Authored: Fri Jan 25 15:09:12 2013 -0600
Committer: Brock Noland <br...@apache.org>
Committed: Fri Jan 25 15:09:12 2013 -0600
----------------------------------------------------------------------
.../avro/ReliableSpoolingFileEventReader.java | 20 +++-
.../apache/flume/source/SpoolDirectorySource.java | 5 +-
...SpoolDirectorySourceConfigurationConstants.java | 13 ++-
.../avro/TestReliableSpoolingFileEventReader.java | 2 +-
flume-ng-doc/sphinx/FlumeUserGuide.rst | 98 ++++++++++-----
5 files changed, 98 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/001d4860/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index b19d0ea..28df24c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.*;
import java.util.regex.Pattern;
@@ -84,6 +85,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
private final boolean annotateFileName;
private final String fileNameHeader;
private final String deletePolicy;
+ private final Charset inputCharset;
private Optional<FileInfo> currentFile = Optional.absent();
/** Always contains the last file from which lines have been read. **/
@@ -97,7 +99,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
String completedSuffix, String ignorePattern, String trackerDirPath,
boolean annotateFileName, String fileNameHeader,
String deserializerType, Context deserializerContext,
- String deletePolicy) throws IOException {
+ String deletePolicy, String inputCharset) throws IOException {
// Sanity checks
Preconditions.checkNotNull(spoolDirectory);
@@ -107,6 +109,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
Preconditions.checkNotNull(deserializerType);
Preconditions.checkNotNull(deserializerContext);
Preconditions.checkNotNull(deletePolicy);
+ Preconditions.checkNotNull(inputCharset);
// validate delete policy
if (!deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name()) &&
@@ -149,6 +152,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
this.fileNameHeader = fileNameHeader;
this.ignorePattern = Pattern.compile(ignorePattern);
this.deletePolicy = deletePolicy;
+ this.inputCharset = Charset.forName(inputCharset);
File trackerDirectory = new File(trackerDirPath);
@@ -422,7 +426,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
tracker.getTarget(), nextPath);
ResettableInputStream in =
- new ResettableFileInputStream(nextFile, tracker);
+ new ResettableFileInputStream(nextFile, tracker,
+ ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset);
EventDeserializer deserializer = EventDeserializerFactory.getInstance
(deserializerType, deserializerContext, in);
@@ -482,7 +487,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
private String ignorePattern =
SpoolDirectorySourceConfigurationConstants.DEFAULT_IGNORE_PAT;
private String trackerDirPath =
- SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR;
+ SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
private Boolean annotateFileName =
SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_HEADER;
private String fileNameHeader =
@@ -492,6 +497,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
private Context deserializerContext = new Context();
private String deletePolicy =
SpoolDirectorySourceConfigurationConstants.DEFAULT_DELETE_POLICY;
+ private String inputCharset =
+ SpoolDirectorySourceConfigurationConstants.DEFAULT_INPUT_CHARSET;
public Builder spoolDirectory(File directory) {
this.spoolDirectory = directory;
@@ -538,10 +545,15 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
return this;
}
+ public Builder inputCharset(String inputCharset) {
+ this.inputCharset = inputCharset;
+ return this;
+ }
+
public ReliableSpoolingFileEventReader build() throws IOException {
return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
- deserializerType, deserializerContext, deletePolicy);
+ deserializerType, deserializerContext, deletePolicy, inputCharset);
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/001d4860/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 552bd48..698b906 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -56,6 +56,7 @@ Configurable, EventDrivenSource {
private String deserializerType;
private Context deserializerContext;
private String deletePolicy;
+ private String inputCharset;
private CounterGroup counterGroup;
ReliableSpoolingFileEventReader reader;
@@ -81,6 +82,7 @@ Configurable, EventDrivenSource {
.deserializerType(deserializerType)
.deserializerContext(deserializerContext)
.deletePolicy(deletePolicy)
+ .inputCharset(inputCharset)
.build();
} catch (IOException ioe) {
throw new FlumeException("Error instantiating spooling event parser",
@@ -115,9 +117,10 @@ Configurable, EventDrivenSource {
DEFAULT_FILENAME_HEADER_KEY);
batchSize = context.getInteger(BATCH_SIZE,
DEFAULT_BATCH_SIZE);
+ inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);
ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT);
- trackerDirPath = context.getString(META_DIR, DEFAULT_META_DIR);
+ trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR);
deserializerType = context.getString(DESERIALIZER, DEFAULT_DESERIALIZER);
deserializerContext = new Context(context.getSubProperties(DESERIALIZER +
http://git-wip-us.apache.org/repos/asf/flume/blob/001d4860/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
index afc7288..f3cc703 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
@@ -35,14 +35,18 @@ public class SpoolDirectorySourceConfigurationConstants {
/** What size to batch with before sending to ChannelProcessor. */
public static final String BATCH_SIZE = "batchSize";
- public static final int DEFAULT_BATCH_SIZE = 10;
+ public static final int DEFAULT_BATCH_SIZE = 100;
/** Maximum number of lines to buffer between commits. */
+ @Deprecated
public static final String BUFFER_MAX_LINES = "bufferMaxLines";
+ @Deprecated
public static final int DEFAULT_BUFFER_MAX_LINES = 100;
/** Maximum length of line (in characters) in buffer between commits. */
+ @Deprecated
public static final String BUFFER_MAX_LINE_LENGTH = "bufferMaxLineLength";
+ @Deprecated
public static final int DEFAULT_BUFFER_MAX_LINE_LENGTH = 5000;
/** Pattern of files to ignore */
@@ -50,8 +54,8 @@ public class SpoolDirectorySourceConfigurationConstants {
public static final String DEFAULT_IGNORE_PAT = "^$"; // no effect
/** Directory to store metadata about files being processed */
- public static final String META_DIR = "metaDir";
- public static final String DEFAULT_META_DIR = ".flumespool";
+ public static final String TRACKER_DIR = "trackerDir";
+ public static final String DEFAULT_TRACKER_DIR = ".flumespool";
/** Deserializer to use to parse the file data into Flume Events */
public static final String DESERIALIZER = "deserializer";
@@ -59,4 +63,7 @@ public class SpoolDirectorySourceConfigurationConstants {
public static final String DELETE_POLICY = "deletePolicy";
public static final String DEFAULT_DELETE_POLICY = "never";
+
+ public static final String INPUT_CHARSET = "inputCharset";
+ public static final String DEFAULT_INPUT_CHARSET = "UTF-8";
}
http://git-wip-us.apache.org/repos/asf/flume/blob/001d4860/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
index a29606e..31ecf8e 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -119,7 +119,7 @@ public class TestReliableSpoolingFileEventReader {
@Test
public void testRepeatedCallsWithCommitOnSuccess() throws IOException {
String trackerDirPath =
- SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR;
+ SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
File trackerDir = new File(WORK_DIR, trackerDirPath);
ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
http://git-wip-us.apache.org/repos/asf/flume/blob/001d4860/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index b2c58de..452c634 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -773,54 +773,90 @@ Example for agent named a1:
Spooling Directory Source
~~~~~~~~~~~~~~~~~~~~~~~~~
-This source lets you ingest data by dropping files in a spooling directory on
-disk. **Unlike other asynchronous sources, this source
-avoids data loss even if Flume is restarted or fails.**
-Flume will watch the directory for new files and read then ingest them
-as they appear. After a given file has been fully read into the channel,
-it is renamed to indicate completion. This allows a cleaner process to remove
-completed files periodically. Note, however,
-that events may be duplicated if failures occur, consistent with the semantics
-offered by other Flume components. The channel optionally inserts the full path of
-the origin file into a header field of each event. This source buffers file data
-in memory during reads; be sure to set the `bufferMaxLineLength` option to a number
-greater than the longest line you expect to see in your input data.
-
-.. warning:: This channel expects that only immutable, uniquely named files
- are dropped in the spooling directory. If duplicate names are
- used, or files are modified while being read, the source will
- fail with an error message. For some use cases this may require
- adding unique identifiers (such as a timestamp) to log file names
- when they are copied into the spooling directory.
+This source lets you ingest data by placing files to be ingested into a
+"spooling" directory on disk.
+This source will watch the specified directory for new files, and will parse
+events out of new files as they appear.
+The event parsing logic is pluggable.
+After a given file has been fully read
+into the channel, it is renamed to indicate completion (or optionally deleted).
+
+Unlike the Exec source, this source is reliable and will not miss data, even if
+Flume is restarted or killed. In exchange for this reliability, only immutable,
+uniquely-named files must be dropped into the spooling directory. Flume tries
+to detect these problem conditions and will fail loudly if they are violated:
+
+#. If a file is written to after being placed into the spooling directory,
+ Flume will print an error to its log file and stop processing.
+#. If a file name is reused at a later time, Flume will print an error to its
+ log file and stop processing.
+
+To avoid the above issues, it may be useful to add a unique identifier
+(such as a timestamp) to log file names when they are moved into the spooling
+directory.
+
+Despite the reliability guarantees of this source, there are still
+cases in which events may be duplicated if certain downstream failures occur.
+This is consistent with the guarantees offered by other Flume components.
==================== ============== ==========================================================
Property Name Default Description
==================== ============== ==========================================================
**channels** --
-**type** -- The component type name, needs to be ``spooldir``
-**spoolDir** -- The directory where log files will be spooled
+**type** -- The component type name, needs to be ``spooldir``.
+**spoolDir** -- The directory from which to read files from.
fileSuffix .COMPLETED Suffix to append to completely ingested files
+deletePolicy never When to delete completed files: ``never`` or ``immediate``
fileHeader false Whether to add a header storing the filename
fileHeaderKey file Header key to use when appending filename to header
-batchSize 10 Granularity at which to batch transfer to the channel
-bufferMaxLines 100 Maximum number of lines the commit buffer can hold
-bufferMaxLineLength 5000 Maximum length of a line in the commit buffer
+ignorePattern ^$ Regular expression specifying which files to ignore (skip)
+trackerDir .flumespool Directory to store metadata related to processing of files.
+ If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
+batchSize 100 Granularity at which to batch transfer to the channel
+inputCharset UTF-8 Character set used by deserializers that treat the input file as text.
+deserializer ``LINE`` Specify the deserializer used to parse the file into events.
+ Defaults to parsing each line as an event. The class specified must implement
+ ``EventDeserializer.Builder``.
+deserializer.* Varies per event deserializer.
+bufferMaxLines -- (Obselete) This option is now ignored.
+bufferMaxLineLength 5000 (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.
selector.type replicating replicating or multiplexing
selector.* Depends on the selector.type value
interceptors -- Space separated list of interceptors
interceptors.*
==================== ============== ==========================================================
-Example for agent named a1:
+Example for an agent named agent-1:
.. code-block:: properties
- a1.sources = r1
- a1.channels = c1
- a1.sources.r1.type = spooldir
- a1.sources.r1.spoolDir = /var/log/apache/flumeSpool
- a1.sources.r1.fileHeader = true
- a1.sources.r1.channels = c1
+ agent-1.channels = ch-1
+ agent-1.sources = src-1
+
+ agent-1.sources.src-1.type = spooldir
+ agent-1.sources.src-1.channels = ch-1
+ agent-1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
+ agent-1.sources.src-1.fileHeader = true
+
+Event Deserializers
+'''''''''''''''''''
+
+The following event deserializers ship with Flume.
+
+LINE
+^^^^
+
+This deserializer generates one event per line of text input.
+
+============================== ============== ==========================================================
+Property Name Default Description
+============================== ============== ==========================================================
+deserializer.maxLineLength 2048 Maximum number of characters to include in a single event.
+ If a line exceeds this length, it is truncated, and the
+ remaining characters on the line will appear in a
+ subsequent event.
+deserializer.outputCharset UTF-8 Charset to use for encoding events put into the channel.
+============================== ============== ==========================================================
NetCat Source
~~~~~~~~~~~~~