You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/01/25 22:09:40 UTC

git commit: FLUME-1800: Docs for spooling source durability changes

Updated Branches:
  refs/heads/trunk c1e3d5041 -> 001d4860e


FLUME-1800: Docs for spooling source durability changes

(Mike Percy via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/001d4860
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/001d4860
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/001d4860

Branch: refs/heads/trunk
Commit: 001d4860e16e690d8135f63bf864200c24b04ee8
Parents: c1e3d50
Author: Brock Noland <br...@apache.org>
Authored: Fri Jan 25 15:09:12 2013 -0600
Committer: Brock Noland <br...@apache.org>
Committed: Fri Jan 25 15:09:12 2013 -0600

----------------------------------------------------------------------
 .../avro/ReliableSpoolingFileEventReader.java      |   20 +++-
 .../apache/flume/source/SpoolDirectorySource.java  |    5 +-
 ...SpoolDirectorySourceConfigurationConstants.java |   13 ++-
 .../avro/TestReliableSpoolingFileEventReader.java  |    2 +-
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   98 ++++++++++-----
 5 files changed, 98 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/001d4860/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index b19d0ea..28df24c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.*;
 import java.util.regex.Pattern;
 
@@ -84,6 +85,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
   private final boolean annotateFileName;
   private final String fileNameHeader;
   private final String deletePolicy;
+  private final Charset inputCharset;
 
   private Optional<FileInfo> currentFile = Optional.absent();
   /** Always contains the last file from which lines have been read. **/
@@ -97,7 +99,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
       String completedSuffix, String ignorePattern, String trackerDirPath,
       boolean annotateFileName, String fileNameHeader,
       String deserializerType, Context deserializerContext,
-      String deletePolicy) throws IOException {
+      String deletePolicy, String inputCharset) throws IOException {
 
     // Sanity checks
     Preconditions.checkNotNull(spoolDirectory);
@@ -107,6 +109,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     Preconditions.checkNotNull(deserializerType);
     Preconditions.checkNotNull(deserializerContext);
     Preconditions.checkNotNull(deletePolicy);
+    Preconditions.checkNotNull(inputCharset);
 
     // validate delete policy
     if (!deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name()) &&
@@ -149,6 +152,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     this.fileNameHeader = fileNameHeader;
     this.ignorePattern = Pattern.compile(ignorePattern);
     this.deletePolicy = deletePolicy;
+    this.inputCharset = Charset.forName(inputCharset);
 
     File trackerDirectory = new File(trackerDirPath);
 
@@ -422,7 +426,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
             tracker.getTarget(), nextPath);
 
         ResettableInputStream in =
-            new ResettableFileInputStream(nextFile, tracker);
+            new ResettableFileInputStream(nextFile, tracker,
+                ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset);
         EventDeserializer deserializer = EventDeserializerFactory.getInstance
             (deserializerType, deserializerContext, in);
 
@@ -482,7 +487,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     private String ignorePattern =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_IGNORE_PAT;
     private String trackerDirPath =
-        SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR;
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
     private Boolean annotateFileName =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_HEADER;
     private String fileNameHeader =
@@ -492,6 +497,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     private Context deserializerContext = new Context();
     private String deletePolicy =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_DELETE_POLICY;
+    private String inputCharset =
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_INPUT_CHARSET;
 
     public Builder spoolDirectory(File directory) {
       this.spoolDirectory = directory;
@@ -538,10 +545,15 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
       return this;
     }
 
+    public Builder inputCharset(String inputCharset) {
+      this.inputCharset = inputCharset;
+      return this;
+    }
+
     public ReliableSpoolingFileEventReader build() throws IOException {
       return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
           ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
-          deserializerType, deserializerContext, deletePolicy);
+          deserializerType, deserializerContext, deletePolicy, inputCharset);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/001d4860/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 552bd48..698b906 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -56,6 +56,7 @@ Configurable, EventDrivenSource {
   private String deserializerType;
   private Context deserializerContext;
   private String deletePolicy;
+  private String inputCharset;
 
   private CounterGroup counterGroup;
   ReliableSpoolingFileEventReader reader;
@@ -81,6 +82,7 @@ Configurable, EventDrivenSource {
           .deserializerType(deserializerType)
           .deserializerContext(deserializerContext)
           .deletePolicy(deletePolicy)
+          .inputCharset(inputCharset)
           .build();
     } catch (IOException ioe) {
       throw new FlumeException("Error instantiating spooling event parser",
@@ -115,9 +117,10 @@ Configurable, EventDrivenSource {
         DEFAULT_FILENAME_HEADER_KEY);
     batchSize = context.getInteger(BATCH_SIZE,
         DEFAULT_BATCH_SIZE);
+    inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);
 
     ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT);
-    trackerDirPath = context.getString(META_DIR, DEFAULT_META_DIR);
+    trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR);
 
     deserializerType = context.getString(DESERIALIZER, DEFAULT_DESERIALIZER);
     deserializerContext = new Context(context.getSubProperties(DESERIALIZER +

http://git-wip-us.apache.org/repos/asf/flume/blob/001d4860/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
index afc7288..f3cc703 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
@@ -35,14 +35,18 @@ public class SpoolDirectorySourceConfigurationConstants {
 
   /** What size to batch with before sending to ChannelProcessor. */
   public static final String BATCH_SIZE = "batchSize";
-  public static final int DEFAULT_BATCH_SIZE = 10;
+  public static final int DEFAULT_BATCH_SIZE = 100;
 
   /** Maximum number of lines to buffer between commits. */
+  @Deprecated
   public static final String BUFFER_MAX_LINES = "bufferMaxLines";
+  @Deprecated
   public static final int DEFAULT_BUFFER_MAX_LINES = 100;
 
   /** Maximum length of line (in characters) in buffer between commits. */
+  @Deprecated
   public static final String BUFFER_MAX_LINE_LENGTH = "bufferMaxLineLength";
+  @Deprecated
   public static final int DEFAULT_BUFFER_MAX_LINE_LENGTH = 5000;
 
   /** Pattern of files to ignore */
@@ -50,8 +54,8 @@ public class SpoolDirectorySourceConfigurationConstants {
   public static final String DEFAULT_IGNORE_PAT = "^$"; // no effect
 
   /** Directory to store metadata about files being processed */
-  public static final String META_DIR = "metaDir";
-  public static final String DEFAULT_META_DIR = ".flumespool";
+  public static final String TRACKER_DIR = "trackerDir";
+  public static final String DEFAULT_TRACKER_DIR = ".flumespool";
 
   /** Deserializer to use to parse the file data into Flume Events */
   public static final String DESERIALIZER = "deserializer";
@@ -59,4 +63,7 @@ public class SpoolDirectorySourceConfigurationConstants {
 
   public static final String DELETE_POLICY = "deletePolicy";
   public static final String DEFAULT_DELETE_POLICY = "never";
+
+  public static final String INPUT_CHARSET = "inputCharset";
+  public static final String DEFAULT_INPUT_CHARSET = "UTF-8";
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/001d4860/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
index a29606e..31ecf8e 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -119,7 +119,7 @@ public class TestReliableSpoolingFileEventReader {
   @Test
   public void testRepeatedCallsWithCommitOnSuccess() throws IOException {
     String trackerDirPath =
-        SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR;
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
     File trackerDir = new File(WORK_DIR, trackerDirPath);
 
     ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()

http://git-wip-us.apache.org/repos/asf/flume/blob/001d4860/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index b2c58de..452c634 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -773,54 +773,90 @@ Example for agent named a1:
 
 Spooling Directory Source
 ~~~~~~~~~~~~~~~~~~~~~~~~~
-This source lets you ingest data by dropping files in a spooling directory on
-disk. **Unlike other asynchronous sources, this source
-avoids data loss even if Flume is restarted or fails.**
-Flume will watch the directory for new files and read then ingest them
-as they appear. After a given file has been fully read into the channel,
-it is renamed to indicate completion. This allows a cleaner process to remove
-completed files periodically. Note, however,
-that events may be duplicated if failures occur, consistent with the semantics
-offered by other Flume components. The channel optionally inserts the full path of
-the origin file into a header field of each event. This source buffers file data
-in memory during reads; be sure to set the `bufferMaxLineLength` option to a number
-greater than the longest line you expect to see in your input data.
-
-.. warning:: This channel expects that only immutable, uniquely named files
-             are dropped in the spooling directory. If duplicate names are
-             used, or files are modified while being read, the source will
-             fail with an error message. For some use cases this may require
-             adding unique identifiers (such as a timestamp) to log file names
-             when they are copied into the spooling directory.
+This source lets you ingest data by placing files to be ingested into a
+"spooling" directory on disk.
+This source will watch the specified directory for new files, and will parse
+events out of new files as they appear.
+The event parsing logic is pluggable.
+After a given file has been fully read
+into the channel, it is renamed to indicate completion (or optionally deleted).
+
+Unlike the Exec source, this source is reliable and will not miss data, even if
+Flume is restarted or killed. In exchange for this reliability, only immutable,
+uniquely-named files must be dropped into the spooling directory. Flume tries
+to detect these problem conditions and will fail loudly if they are violated:
+
+#. If a file is written to after being placed into the spooling directory,
+   Flume will print an error to its log file and stop processing.
+#. If a file name is reused at a later time, Flume will print an error to its
+   log file and stop processing.
+
+To avoid the above issues, it may be useful to add a unique identifier
+(such as a timestamp) to log file names when they are moved into the spooling
+directory.
+
+Despite the reliability guarantees of this source, there are still
+cases in which events may be duplicated if certain downstream failures occur.
+This is consistent with the guarantees offered by other Flume components.
 
 ====================  ==============  ==========================================================
 Property Name         Default         Description
 ====================  ==============  ==========================================================
 **channels**          --
-**type**              --              The component type name, needs to be ``spooldir``
-**spoolDir**          --              The directory where log files will be spooled
+**type**              --              The component type name, needs to be ``spooldir``.
+**spoolDir**          --              The directory from which to read files from.
 fileSuffix            .COMPLETED      Suffix to append to completely ingested files
+deletePolicy          never           When to delete completed files: ``never`` or ``immediate``
 fileHeader            false           Whether to add a header storing the filename
 fileHeaderKey         file            Header key to use when appending filename to header
-batchSize             10              Granularity at which to batch transfer to the channel
-bufferMaxLines        100             Maximum number of lines the commit buffer can hold
-bufferMaxLineLength   5000            Maximum length of a line in the commit buffer
+ignorePattern         ^$              Regular expression specifying which files to ignore (skip)
+trackerDir            .flumespool     Directory to store metadata related to processing of files.
+                                      If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
+batchSize             100             Granularity at which to batch transfer to the channel
+inputCharset          UTF-8           Character set used by deserializers that treat the input file as text.
+deserializer          ``LINE``        Specify the deserializer used to parse the file into events.
+                                      Defaults to parsing each line as an event. The class specified must implement
+                                      ``EventDeserializer.Builder``.
+deserializer.*                        Varies per event deserializer.
+bufferMaxLines        --              (Obselete) This option is now ignored.
+bufferMaxLineLength   5000            (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.
 selector.type         replicating     replicating or multiplexing
 selector.*                            Depends on the selector.type value
 interceptors          --              Space separated list of interceptors
 interceptors.*
 ====================  ==============  ==========================================================
 
-Example for agent named a1:
+Example for an agent named agent-1:
 
 .. code-block:: properties
 
-  a1.sources = r1
-  a1.channels = c1
-  a1.sources.r1.type = spooldir
-  a1.sources.r1.spoolDir = /var/log/apache/flumeSpool
-  a1.sources.r1.fileHeader = true
-  a1.sources.r1.channels = c1
+  agent-1.channels = ch-1
+  agent-1.sources = src-1
+
+  agent-1.sources.src-1.type = spooldir
+  agent-1.sources.src-1.channels = ch-1
+  agent-1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
+  agent-1.sources.src-1.fileHeader = true
+
+Event Deserializers
+'''''''''''''''''''
+
+The following event deserializers ship with Flume.
+
+LINE
+^^^^
+
+This deserializer generates one event per line of text input.
+
+==============================  ==============  ==========================================================
+Property Name                   Default         Description
+==============================  ==============  ==========================================================
+deserializer.maxLineLength      2048            Maximum number of characters to include in a single event.
+                                                If a line exceeds this length, it is truncated, and the
+                                                remaining characters on the line will appear in a
+                                                subsequent event.
+deserializer.outputCharset      UTF-8           Charset to use for encoding events put into the channel.
+==============================  ==============  ==========================================================
 
 NetCat Source
 ~~~~~~~~~~~~~