You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/09/28 18:36:19 UTC

git commit: FLUME-2052. Spooling directory source should be able to replace or ignore malformed characters

Updated Branches:
  refs/heads/trunk 49933493f -> b84d01615


FLUME-2052. Spooling directory source should be able to replace or ignore malformed characters

(Mike Percy via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b84d0161
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b84d0161
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b84d0161

Branch: refs/heads/trunk
Commit: b84d01615a47c8152cfa1119a52a1a1f1b445843
Parents: 4993349
Author: Hari Shreedharan <hs...@apache.org>
Authored: Sat Sep 28 09:35:03 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Sat Sep 28 09:35:03 2013 -0700

----------------------------------------------------------------------
 .../avro/ReliableSpoolingFileEventReader.java   |  32 ++++-
 .../flume/serialization/DecodeErrorPolicy.java  |  31 +++++
 .../ResettableFileInputStream.java              |  23 +++-
 .../flume/source/SpoolDirectorySource.java      |   6 +
 ...olDirectorySourceConfigurationConstants.java |   8 ++
 .../TestResettableFileInputStream.java          | 118 ++++++++++++++++++-
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |   5 +
 7 files changed, 215 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/b84d0161/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index 724ab38..bd684ed 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -24,7 +24,10 @@ import java.io.FileFilter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.charset.Charset;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import java.util.regex.Pattern;
 
 import org.apache.flume.Context;
@@ -32,7 +35,13 @@ import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
-import org.apache.flume.serialization.*;
+import org.apache.flume.serialization.DecodeErrorPolicy;
+import org.apache.flume.serialization.DurablePositionTracker;
+import org.apache.flume.serialization.EventDeserializer;
+import org.apache.flume.serialization.EventDeserializerFactory;
+import org.apache.flume.serialization.PositionTracker;
+import org.apache.flume.serialization.ResettableFileInputStream;
+import org.apache.flume.serialization.ResettableInputStream;
 import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
 import org.apache.flume.tools.PlatformDetect;
 import org.slf4j.Logger;
@@ -86,6 +95,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
   private final String fileNameHeader;
   private final String deletePolicy;
   private final Charset inputCharset;
+  private final DecodeErrorPolicy decodeErrorPolicy;
 
   private Optional<FileInfo> currentFile = Optional.absent();
   /** Always contains the last file from which lines have been read. **/
@@ -99,7 +109,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
       String completedSuffix, String ignorePattern, String trackerDirPath,
       boolean annotateFileName, String fileNameHeader,
       String deserializerType, Context deserializerContext,
-      String deletePolicy, String inputCharset) throws IOException {
+      String deletePolicy, String inputCharset,
+      DecodeErrorPolicy decodeErrorPolicy) throws IOException {
 
     // Sanity checks
     Preconditions.checkNotNull(spoolDirectory);
@@ -156,6 +167,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     this.ignorePattern = Pattern.compile(ignorePattern);
     this.deletePolicy = deletePolicy;
     this.inputCharset = Charset.forName(inputCharset);
+    this.decodeErrorPolicy = Preconditions.checkNotNull(decodeErrorPolicy);
 
     File trackerDirectory = new File(trackerDirPath);
 
@@ -432,7 +444,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
 
         ResettableInputStream in =
             new ResettableFileInputStream(nextFile, tracker,
-                ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset);
+                ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset,
+                decodeErrorPolicy);
         EventDeserializer deserializer = EventDeserializerFactory.getInstance
             (deserializerType, deserializerContext, in);
 
@@ -504,6 +517,9 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
         SpoolDirectorySourceConfigurationConstants.DEFAULT_DELETE_POLICY;
     private String inputCharset =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_INPUT_CHARSET;
+    private DecodeErrorPolicy decodeErrorPolicy = DecodeErrorPolicy.valueOf(
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_DECODE_ERROR_POLICY
+            .toUpperCase());
 
     public Builder spoolDirectory(File directory) {
       this.spoolDirectory = directory;
@@ -555,10 +571,16 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
       return this;
     }
 
+    public Builder decodeErrorPolicy(DecodeErrorPolicy decodeErrorPolicy) {
+      this.decodeErrorPolicy = decodeErrorPolicy;
+      return this;
+    }
+
     public ReliableSpoolingFileEventReader build() throws IOException {
       return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
           ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
-          deserializerType, deserializerContext, deletePolicy, inputCharset);
+          deserializerType, deserializerContext, deletePolicy, inputCharset,
+          decodeErrorPolicy);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/b84d0161/flume-ng-core/src/main/java/org/apache/flume/serialization/DecodeErrorPolicy.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/DecodeErrorPolicy.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/DecodeErrorPolicy.java
new file mode 100644
index 0000000..a55b6b6
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/DecodeErrorPolicy.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.serialization;
+
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public enum DecodeErrorPolicy {
+  FAIL,
+  REPLACE,
+  IGNORE
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/b84d0161/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
index ecea5e2..587ab29 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
@@ -34,6 +34,7 @@ import java.nio.channels.FileChannel;
 import java.nio.charset.Charset;
 import java.nio.charset.CharsetDecoder;
 import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
 
 /**
  * <p/>This class makes the following assumptions:
@@ -78,7 +79,7 @@ public class ResettableFileInputStream extends ResettableInputStream
    */
   public ResettableFileInputStream(File file, PositionTracker tracker)
       throws IOException {
-    this(file, tracker, DEFAULT_BUF_SIZE, Charsets.UTF_8);
+    this(file, tracker, DEFAULT_BUF_SIZE, Charsets.UTF_8, DecodeErrorPolicy.FAIL);
   }
 
   /**
@@ -98,7 +99,7 @@ public class ResettableFileInputStream extends ResettableInputStream
    * @throws FileNotFoundException
    */
   public ResettableFileInputStream(File file, PositionTracker tracker,
-                                   int bufSize, Charset charset)
+      int bufSize, Charset charset, DecodeErrorPolicy decodeErrorPolicy)
       throws IOException {
     this.file = file;
     this.tracker = tracker;
@@ -115,6 +116,24 @@ public class ResettableFileInputStream extends ResettableInputStream
     this.syncPosition = 0;
     this.maxCharWidth = (int)Math.ceil(charset.newEncoder().maxBytesPerChar());
 
+    CodingErrorAction errorAction;
+    switch (decodeErrorPolicy) {
+      case FAIL:
+        errorAction = CodingErrorAction.REPORT;
+        break;
+      case REPLACE:
+        errorAction = CodingErrorAction.REPLACE;
+        break;
+      case IGNORE:
+        errorAction = CodingErrorAction.IGNORE;
+        break;
+      default:
+        throw new IllegalArgumentException(
+            "Unexpected value for decode error policy: " + decodeErrorPolicy);
+    }
+    decoder.onMalformedInput(errorAction);
+    decoder.onUnmappableCharacter(errorAction);
+
     seek(tracker.getPosition());
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/b84d0161/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 957eb8b..72c4059 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -30,6 +30,7 @@ import org.apache.flume.*;
 import org.apache.flume.client.avro.ReliableSpoolingFileEventReader;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.serialization.DecodeErrorPolicy;
 import org.apache.flume.serialization.LineDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +61,7 @@ Configurable, EventDrivenSource {
   private Context deserializerContext;
   private String deletePolicy;
   private String inputCharset;
+  private DecodeErrorPolicy decodeErrorPolicy;
   private volatile boolean hasFatalError = false;
 
   private SourceCounter sourceCounter;
@@ -86,6 +88,7 @@ Configurable, EventDrivenSource {
           .deserializerContext(deserializerContext)
           .deletePolicy(deletePolicy)
           .inputCharset(inputCharset)
+          .decodeErrorPolicy(decodeErrorPolicy)
           .build();
     } catch (IOException ioe) {
       throw new FlumeException("Error instantiating spooling event parser",
@@ -139,6 +142,9 @@ Configurable, EventDrivenSource {
     batchSize = context.getInteger(BATCH_SIZE,
         DEFAULT_BATCH_SIZE);
     inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);
+    decodeErrorPolicy = DecodeErrorPolicy.valueOf(
+        context.getString(DECODE_ERROR_POLICY, DEFAULT_DECODE_ERROR_POLICY)
+        .toUpperCase());
 
     ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT);
     trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR);

http://git-wip-us.apache.org/repos/asf/flume/blob/b84d0161/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
index f3cc703..7bfb0ee 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
@@ -17,6 +17,8 @@
 
 package org.apache.flume.source;
 
+import org.apache.flume.serialization.DecodeErrorPolicy;
+
 public class SpoolDirectorySourceConfigurationConstants {
   /** Directory where files are deposited. */
   public static final String SPOOL_DIRECTORY = "spoolDir";
@@ -64,6 +66,12 @@ public class SpoolDirectorySourceConfigurationConstants {
   public static final String DELETE_POLICY = "deletePolicy";
   public static final String DEFAULT_DELETE_POLICY = "never";
 
+  /** Character set used when reading the input. */
   public static final String INPUT_CHARSET = "inputCharset";
   public static final String DEFAULT_INPUT_CHARSET = "UTF-8";
+
+  /** What to do when there is a character set decoding error. */
+  public static final String DECODE_ERROR_POLICY = "decodeErrorPolicy";
+  public static final String DEFAULT_DECODE_ERROR_POLICY =
+      DecodeErrorPolicy.FAIL.name();
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/b84d0161/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
index 066765c..73df9c9 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
@@ -32,11 +32,13 @@ import static org.junit.Assert.*;
 import static org.junit.Assert.assertEquals;
 
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
+import java.nio.charset.MalformedInputException;
 import java.util.List;
 
 public class TestResettableFileInputStream {
@@ -108,6 +110,120 @@ public class TestResettableFileInputStream {
     in.close();
   }
 
+  @Test(expected = MalformedInputException.class)
+  public void testUtf8DecodeErrorHandlingFailMalformed() throws IOException {
+    ResettableInputStream in = initUtf8DecodeTest(DecodeErrorPolicy.FAIL);
+    while (in.readChar() != -1) {
+      // Do nothing... read the whole file and throw away the bytes.
+    }
+    fail("Expected MalformedInputException!");
+  }
+
+
+  @Test
+  public void testUtf8DecodeErrorHandlingIgnore() throws IOException {
+    ResettableInputStream in = initUtf8DecodeTest(DecodeErrorPolicy.IGNORE);
+    int c;
+    StringBuilder sb = new StringBuilder();
+    while ((c = in.readChar()) != -1) {
+      sb.append((char)c);
+    }
+    assertEquals("Latin1: ()\nLong: ()\nNonUnicode: ()\n", sb.toString());
+  }
+
+  @Test
+  public void testUtf8DecodeErrorHandlingReplace() throws IOException {
+    ResettableInputStream in = initUtf8DecodeTest(DecodeErrorPolicy.REPLACE);
+    int c;
+    StringBuilder sb = new StringBuilder();
+    while ((c = in.readChar()) != -1) {
+      sb.append((char)c);
+    }
+    assertEquals("Latin1: (X)\nLong: (XXX)\nNonUnicode: (X)\n"
+        .replaceAll("X", "\ufffd"), sb.toString());
+  }
+
+  @Test(expected = MalformedInputException.class)
+  public void testLatin1DecodeErrorHandlingFailMalformed() throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    generateLatin1InvalidSequence(out);
+    Files.write(out.toByteArray(), file);
+    ResettableInputStream in = initInputStream(DecodeErrorPolicy.FAIL);
+    while (in.readChar() != -1) {
+      // Do nothing... read the whole file and throw away the bytes.
+    }
+    fail("Expected MalformedInputException!");
+  }
+
+  @Test
+  public void testLatin1DecodeErrorHandlingReplace() throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    generateLatin1InvalidSequence(out);
+    Files.write(out.toByteArray(), file);
+    ResettableInputStream in = initInputStream(DecodeErrorPolicy.REPLACE);
+
+    int c;
+    StringBuilder sb = new StringBuilder();
+    while ((c = in.readChar()) != -1) {
+      sb.append((char)c);
+    }
+    assertEquals("Invalid: (X)\n".replaceAll("X", "\ufffd"), sb.toString());
+  }
+
+  private ResettableInputStream initUtf8DecodeTest(DecodeErrorPolicy policy)
+      throws IOException {
+    writeBigBadUtf8Sequence(file);
+    return initInputStream(policy);
+  }
+
+  private ResettableInputStream initInputStream(DecodeErrorPolicy policy)
+      throws IOException {
+    PositionTracker tracker = new DurablePositionTracker(meta, file.getPath());
+    ResettableInputStream in = new ResettableFileInputStream(file, tracker,
+        2048, Charsets.UTF_8, policy);
+    return in;
+  }
+
+  private void writeBigBadUtf8Sequence(File file) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    generateUtf8Latin1Sequence(out);
+    generateUtf8OverlyLongSequence(out);
+    generateUtf8NonUnicodeSequence(out);
+    Files.write(out.toByteArray(), file);
+  }
+
+  private void generateUtf8OverlyLongSequence(OutputStream out)
+      throws IOException {
+    out.write("Long: (".getBytes(Charsets.UTF_8));
+    // Overly-long slash character should not be accepted.
+    out.write(new byte[] { (byte)0xe0, (byte)0x80, (byte)0xaf });
+    out.write(")\n".getBytes(Charsets.UTF_8));
+  }
+
+  private void generateUtf8NonUnicodeSequence(OutputStream out)
+      throws IOException {
+    out.write("NonUnicode: (".getBytes(Charsets.UTF_8));
+    // This is a valid 5-octet sequence but is not Unicode
+    out.write(new byte[] { (byte)0xf8, (byte)0xa1, (byte)0xa1, (byte)0xa1,
+        (byte)0xa1 } );
+    out.write(")\n".getBytes(Charsets.UTF_8));
+  }
+
+  private void generateUtf8Latin1Sequence(OutputStream out) throws IOException {
+    out.write("Latin1: (".getBytes(Charsets.UTF_8));
+    // This is "e" with an accent in Latin-1
+    out.write(new byte[] { (byte)0xe9 } );
+    out.write(")\n".getBytes(Charsets.UTF_8));
+  }
+
+  private void generateLatin1InvalidSequence(OutputStream out)
+      throws IOException {
+    out.write("Invalid: (".getBytes(Charsets.UTF_8));
+    // Not a valid character in Latin 1.
+    out.write(new byte[] { (byte)0x81 } );
+    out.write(")\n".getBytes(Charsets.UTF_8));
+  }
+
   /**
    * Ensure a reset() brings us back to the default mark (beginning of file)
    * @throws IOException
@@ -206,7 +322,7 @@ public class TestResettableFileInputStream {
 
     PositionTracker tracker = new DurablePositionTracker(meta, file.getPath());
     ResettableInputStream in = new ResettableFileInputStream(file, tracker,
-        10 * LINE_LEN, Charsets.UTF_8);
+        10 * LINE_LEN, Charsets.UTF_8, DecodeErrorPolicy.FAIL);
 
     String line = "";
     for (int i = 0; i < 9; i++) {

http://git-wip-us.apache.org/repos/asf/flume/blob/b84d0161/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 007436b..5a59b56 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -953,6 +953,11 @@ trackerDir            .flumespool     Directory to store metadata related to pro
                                       If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
 batchSize             100             Granularity at which to batch transfer to the channel
 inputCharset          UTF-8           Character set used by deserializers that treat the input file as text.
+decodeErrorPolicy     ``FAIL``        What to do when we see a non-decodable character in the input file.
+                                      ``FAIL``: Throw an exception and fail to parse the file.
+                                      ``REPLACE``: Replace the unparseable character with the "replacement character" char,
+                                      typically Unicode U+FFFD.
+                                      ``IGNORE``: Drop the unparseable character sequence.
 deserializer          ``LINE``        Specify the deserializer used to parse the file into events.
                                       Defaults to parsing each line as an event. The class specified must implement
                                       ``EventDeserializer.Builder``.