You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/07/19 21:57:16 UTC

[1/3] flume git commit: FLUME-2810. Add static Schema URL to AvroEventSerializer configuration

Repository: flume
Updated Branches:
  refs/heads/trunk c7de4ba5c -> 9965dae7b


FLUME-2810. Add static Schema URL to AvroEventSerializer configuration

Currently the only way to pass a schema to the avro event serializer is
via header. This introduces an option to specify the location directly
in the Flume configuration.

(Jeff Holoman via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/dbf2e989
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/dbf2e989
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/dbf2e989

Branch: refs/heads/trunk
Commit: dbf2e989744a6b2921076be017102f75323a69f4
Parents: c7de4ba
Author: Jeff Holoman <je...@gmail.com>
Authored: Tue Jul 19 12:29:08 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Jul 19 14:43:33 2016 -0700

----------------------------------------------------------------------
 ...roEventSerializerConfigurationConstants.java |  5 ++
 flume-ng-doc/sphinx/FlumeUserGuide.rst          | 51 ++++++++++++++++++--
 .../flume/sink/hdfs/AvroEventSerializer.java    | 45 +++++++++++------
 .../sink/hdfs/TestAvroEventSerializer.java      | 47 ++++++++++++++----
 4 files changed, 117 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/dbf2e989/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java
index cce6716..470fcea 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java
@@ -35,4 +35,9 @@ public class AvroEventSerializerConfigurationConstants {
   public static final String COMPRESSION_CODEC = "compressionCodec";
   public static final String DEFAULT_COMPRESSION_CODEC = "null"; // no codec
 
+  /**
+   * Avro static Schema URL
+   */
+  public static final String STATIC_SCHEMA_URL = "schemaURL";
+  public static final String DEFAULT_STATIC_SCHEMA_URL = null;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/dbf2e989/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index f9ca1b2..3937514 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -3235,19 +3235,59 @@ Example for agent named a1:
   a1.sinks.k1.sink.serializer = text
   a1.sinks.k1.sink.serializer.appendNewline = false
 
+"Flume Event" Avro Event Serializer
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Alias: ``avro_event``.
+
+This interceptor serializes Flume events into an Avro container file. The schema used is the same schema used for
+Flume events in the Avro RPC mechanism.
+
+This serializer inherits from the ``AbstractAvroEventSerializer`` class.
+
+Configuration options are as follows:
+
+==========================  ================  ===========================================================================
+Property Name               Default           Description
+==========================  ================  ===========================================================================
+syncIntervalBytes           2048000           Avro sync interval, in approximate bytes.
+compressionCodec            null              Avro compression codec. For supported codecs, see Avro's CodecFactory docs.
+==========================  ================  ===========================================================================
+
+Example for agent named a1:
+
+.. code-block:: properties
+
+  a1.sinks.k1.type = hdfs
+  a1.sinks.k1.channel = c1
+  a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
+  a1.sinks.k1.serializer = avro_event
+  a1.sinks.k1.serializer.compressionCodec = snappy
+
 Avro Event Serializer
 ~~~~~~~~~~~~~~~~~~~~~
 
-Alias: ``avro_event``. This interceptor serializes Flume events into an Avro
-container file. The schema used is the same schema used for Flume events
-in the Avro RPC mechanism. This serializers inherits from the
-``AbstractAvroEventSerializer`` class. Configuration options are as follows:
+Alias: This serializer does not have an alias, and must be specified using the fully-qualified class name class name.
+
+This serializes Flume events into an Avro container file like the "Flume Event" Avro Event Serializer, however the
+record schema is configurable. The record schema may be specified either as a Flume configuration property or passed in an event header.
+
+To pass the record schema as part of the Flume configuration, use the property ``schemaURL`` as listed below.
+
+To pass the record schema in an event header, specify either the event header ``flume.avro.schema.literal``
+containing a JSON-format representation of the schema or ``flume.avro.schema.url`` with a URL where
+the schema may be found (``hdfs:/...`` URIs are supported).
+
+This serializer inherits from the ``AbstractAvroEventSerializer`` class.
+
+Configuration options are as follows:
 
 ==========================  ================  ===========================================================================
 Property Name               Default           Description
 ==========================  ================  ===========================================================================
 syncIntervalBytes           2048000           Avro sync interval, in approximate bytes.
 compressionCodec            null              Avro compression codec. For supported codecs, see Avro's CodecFactory docs.
+schemaURL                   null              Avro schema URL. Schemas specified in the header ovverride this option.
 ==========================  ================  ===========================================================================
 
 Example for agent named a1:
@@ -3257,8 +3297,9 @@ Example for agent named a1:
   a1.sinks.k1.type = hdfs
   a1.sinks.k1.channel = c1
   a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
-  a1.sinks.k1.serializer = avro_event
+  a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
   a1.sinks.k1.serializer.compressionCodec = snappy
+  a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc
 
 
 Flume Interceptors

http://git-wip-us.apache.org/repos/asf/flume/blob/dbf2e989/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
index fea6218..3231742 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
@@ -18,14 +18,6 @@
  */
 package org.apache.flume.sink.hdfs;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
@@ -44,7 +36,21 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.*;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.COMPRESSION_CODEC;
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_COMPRESSION_CODEC;
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_STATIC_SCHEMA_URL;
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_SYNC_INTERVAL_BYTES;
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.STATIC_SCHEMA_URL;
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.SYNC_INTERVAL_BYTES;
 
 /**
  * <p>
@@ -76,6 +82,7 @@ public class AvroEventSerializer implements EventSerializer, Configurable {
   private int syncIntervalBytes;
   private String compressionCodec;
   private Map<String, Schema> schemaCache = new HashMap<String, Schema>();
+  private String staticSchemaURL;
 
   private AvroEventSerializer(OutputStream out) {
     this.out = out;
@@ -87,6 +94,7 @@ public class AvroEventSerializer implements EventSerializer, Configurable {
         context.getInteger(SYNC_INTERVAL_BYTES, DEFAULT_SYNC_INTERVAL_BYTES);
     compressionCodec =
         context.getString(COMPRESSION_CODEC, DEFAULT_COMPRESSION_CODEC);
+    staticSchemaURL = context.getString(STATIC_SCHEMA_URL, DEFAULT_STATIC_SCHEMA_URL);
   }
 
   @Override
@@ -111,19 +119,24 @@ public class AvroEventSerializer implements EventSerializer, Configurable {
   private void initialize(Event event) throws IOException {
     Schema schema = null;
     String schemaUrl = event.getHeaders().get(AVRO_SCHEMA_URL_HEADER);
-    if (schemaUrl != null) {
+    String schemaString = event.getHeaders().get(AVRO_SCHEMA_LITERAL_HEADER);
+
+    if (schemaUrl != null) { // if URL_HEADER is there then use it
       schema = schemaCache.get(schemaUrl);
       if (schema == null) {
         schema = loadFromUrl(schemaUrl);
         schemaCache.put(schemaUrl, schema);
       }
-    }
-    if (schema == null) {
-      String schemaString = event.getHeaders().get(AVRO_SCHEMA_LITERAL_HEADER);
-      if (schemaString == null) {
-        throw new FlumeException("Could not find schema for event " + event);
-      }
+    } else if (schemaString != null) { // fallback to LITERAL_HEADER if it was there
       schema = new Schema.Parser().parse(schemaString);
+    } else if (staticSchemaURL != null) {   // fallback to static url if it was there
+      schema = schemaCache.get(staticSchemaURL);
+      if (schema == null) {
+        schema = loadFromUrl(staticSchemaURL);
+        schemaCache.put(staticSchemaURL, schema);
+      }
+    } else { // no other options so giving up
+      throw new FlumeException("Could not find schema for event " + event);
     }
 
     writer = new GenericDatumWriter<Object>(schema);

http://git-wip-us.apache.org/repos/asf/flume/blob/dbf2e989/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java
index 38af74d..6b38da2 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java
@@ -39,10 +39,12 @@ import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.serialization.AvroEventSerializerConfigurationConstants;
 import org.apache.flume.serialization.EventSerializer;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.After;
 
 public class TestAvroEventSerializer {
 
@@ -53,39 +55,55 @@ public class TestAvroEventSerializer {
     file = File.createTempFile(getClass().getSimpleName(), "");
   }
 
+  @After
+  public void tearDown() throws Exception {
+    file.delete();
+  }
+
   @Test
   public void testNoCompression() throws IOException {
-    createAvroFile(file, null, false);
+    createAvroFile(file, null, false, false);
     validateAvroFile(file);
   }
 
   @Test
   public void testNullCompression() throws IOException {
-    createAvroFile(file, "null", false);
+    createAvroFile(file, "null", false, false);
     validateAvroFile(file);
   }
 
   @Test
   public void testDeflateCompression() throws IOException {
-    createAvroFile(file, "deflate", false);
+    createAvroFile(file, "deflate", false, false);
     validateAvroFile(file);
   }
 
   @Test
   public void testSnappyCompression() throws IOException {
-    createAvroFile(file, "snappy", false);
+    createAvroFile(file, "snappy", false, false);
     validateAvroFile(file);
   }
 
   @Test
   public void testSchemaUrl() throws IOException {
-    createAvroFile(file, null, true);
+    createAvroFile(file, null, true, false);
     validateAvroFile(file);
   }
 
-  public void createAvroFile(File file, String codec, boolean useSchemaUrl) throws
-      IOException {
+  @Test
+  public void testStaticSchemaUrl() throws IOException {
+    createAvroFile(file,null,false, true);
+    validateAvroFile(file);
+  }
 
+  @Test
+  public void testBothUrls() throws IOException {
+    createAvroFile(file,null,true,true);
+    validateAvroFile(file);
+  }
+
+  public void createAvroFile(File file, String codec, boolean useSchemaUrl,
+                             boolean useStaticSchemaUrl) throws IOException {
     // serialize a few events using the reflection-based avro serializer
     OutputStream out = new FileOutputStream(file);
 
@@ -100,11 +118,16 @@ public class TestAvroEventSerializer {
     }));
     GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
     File schemaFile = null;
-    if (useSchemaUrl) {
+    if (useSchemaUrl || useStaticSchemaUrl) {
       schemaFile = File.createTempFile(getClass().getSimpleName(), ".avsc");
       Files.write(schema.toString(), schemaFile, Charsets.UTF_8);
     }
 
+    if (useStaticSchemaUrl) {
+      ctx.put(AvroEventSerializerConfigurationConstants.STATIC_SCHEMA_URL,
+              schemaFile.toURI().toURL().toExternalForm());
+    }
+
     EventSerializer.Builder builder = new AvroEventSerializer.Builder();
     EventSerializer serializer = builder.build(ctx, out);
 
@@ -112,10 +135,10 @@ public class TestAvroEventSerializer {
     for (int i = 0; i < 3; i++) {
       GenericRecord record = recordBuilder.set("message", "Hello " + i).build();
       Event event = EventBuilder.withBody(serializeAvro(record, schema));
-      if (schemaFile == null) {
+      if (schemaFile == null && !useSchemaUrl) {
         event.getHeaders().put(AvroEventSerializer.AVRO_SCHEMA_LITERAL_HEADER,
             schema.toString());
-      } else {
+      } else if (useSchemaUrl) {
         event.getHeaders().put(AvroEventSerializer.AVRO_SCHEMA_URL_HEADER,
             schemaFile.toURI().toURL().toExternalForm());
       }
@@ -125,6 +148,10 @@ public class TestAvroEventSerializer {
     serializer.beforeClose();
     out.flush();
     out.close();
+    if (schemaFile != null ) {
+      schemaFile.delete();
+    }
+
   }
 
   private byte[] serializeAvro(Object datum, Schema schema) throws IOException {


[2/3] flume git commit: Fix broken link in README

Posted by mp...@apache.org.
Fix broken link in README


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0259d302
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0259d302
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0259d302

Branch: refs/heads/trunk
Commit: 0259d302730f94b99c1a84f8311d1abc0f635eb3
Parents: dbf2e98
Author: Mike Percy <mp...@apache.org>
Authored: Tue Jul 19 12:42:10 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Jul 19 14:43:40 2016 -0700

----------------------------------------------------------------------
 README.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/0259d302/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 7198a65..9ebb2a3 100644
--- a/README.md
+++ b/README.md
@@ -39,7 +39,7 @@ In source form, it can be found in the flume-ng-doc directory.
 
 The Flume 1.x guide and FAQ are available here:
 
-* https://cwiki.apache.org/FLUME/flume-ng.html
+* https://cwiki.apache.org/FLUME
 * https://cwiki.apache.org/confluence/display/FLUME/Getting+Started
 
 ## Contact us!


[3/3] flume git commit: Fix sphinx layout errors

Posted by mp...@apache.org.
Fix sphinx layout errors

Minor syntax error fixes


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/9965dae7
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/9965dae7
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/9965dae7

Branch: refs/heads/trunk
Commit: 9965dae7bd384abd2e25fd1756dbe516e8acdb9b
Parents: 0259d30
Author: Mike Percy <mp...@apache.org>
Authored: Tue Jul 19 13:33:58 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Jul 19 14:43:40 2016 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst | 92 ++++++++++++++---------------
 1 file changed, 46 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/9965dae7/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 3937514..d8bfebf 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -976,53 +976,53 @@ Despite the reliability guarantees of this source, there are still
 cases in which events may be duplicated if certain downstream failures occur.
 This is consistent with the guarantees offered by other Flume components.
 
-======================  ==============  ==========================================================
-Property Name            Default         Description
-======================  ==============  ==========================================================
-**channels**             --
-**type**                 --              The component type name, needs to be ``spooldir``.
-**spoolDir**             --              The directory from which to read files from.
-fileSuffix               .COMPLETED      Suffix to append to completely ingested files
-deletePolicy             never           When to delete completed files: ``never`` or ``immediate``
-fileHeader               false           Whether to add a header storing the absolute path filename.
-fileHeaderKey            file            Header key to use when appending absolute path filename to event header.
-basenameHeader           false           Whether to add a header storing the basename of the file.
-basenameHeaderKey        basename        Header Key to use when appending  basename of file to event header.
-ignorePattern            ^$              Regular expression specifying which files to ignore (skip)
-trackerDir               .flumespool     Directory to store metadata related to processing of files.
-                                         If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
-consumeOrder             oldest          In which order files in the spooling directory will be consumed ``oldest``,
-                                         ``youngest`` and ``random``. In case of ``oldest`` and ``youngest``, the last modified
-                                         time of the files will be used to compare the files. In case of a tie, the file
-                                         with smallest lexicographical order will be consumed first. In case of ``random`` any
-                                         file will be picked randomly. When using ``oldest`` and ``youngest`` the whole
-                                         directory will be scanned to pick the oldest/youngest file, which might be slow if there
-                                         are a large number of files, while using ``random`` may cause old files to be consumed
-                                         very late if new files keep coming in the spooling directory.
-pollDelay                500             Delay (in milliseconds) used when polling for new files.
-recursiveDirectorySearch false           Whether to monitor sub directories for new files to read.
-maxBackoff               4000            The maximum time (in millis) to wait between consecutive attempts to
-                                         write to the channel(s) if the channel is full. The source will start at
-                                         a low backoff and increase it exponentially each time the channel throws a
-                                         ChannelException, upto the value specified by this parameter.
-batchSize                100             Granularity at which to batch transfer to the channel
-inputCharset             UTF-8           Character set used by deserializers that treat the input file as text.
-decodeErrorPolicy        ``FAIL``        What to do when we see a non-decodable character in the input file.
-                                         ``FAIL``: Throw an exception and fail to parse the file.
-                                         ``REPLACE``: Replace the unparseable character with the "replacement character" char,
-                                         typically Unicode U+FFFD.
-                                         ``IGNORE``: Drop the unparseable character sequence.
-deserializer             ``LINE``        Specify the deserializer used to parse the file into events.
-                                         Defaults to parsing each line as an event. The class specified must implement
-                                         ``EventDeserializer.Builder``.
-deserializer.*                           Varies per event deserializer.
-bufferMaxLines           --              (Obselete) This option is now ignored.
-bufferMaxLineLength      5000            (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.
-selector.type            replicating     replicating or multiplexing
-selector.*                               Depends on the selector.type value
-interceptors             --              Space-separated list of interceptors
+========================  ==============  ==========================================================
+Property Name             Default         Description
+========================  ==============  ==========================================================
+**channels**              --
+**type**                  --              The component type name, needs to be ``spooldir``.
+**spoolDir**              --              The directory from which to read files from.
+fileSuffix                .COMPLETED      Suffix to append to completely ingested files
+deletePolicy              never           When to delete completed files: ``never`` or ``immediate``
+fileHeader                false           Whether to add a header storing the absolute path filename.
+fileHeaderKey             file            Header key to use when appending absolute path filename to event header.
+basenameHeader            false           Whether to add a header storing the basename of the file.
+basenameHeaderKey         basename        Header Key to use when appending  basename of file to event header.
+ignorePattern             ^$              Regular expression specifying which files to ignore (skip)
+trackerDir                .flumespool     Directory to store metadata related to processing of files.
+                                          If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
+consumeOrder              oldest          In which order files in the spooling directory will be consumed ``oldest``,
+                                          ``youngest`` and ``random``. In case of ``oldest`` and ``youngest``, the last modified
+                                          time of the files will be used to compare the files. In case of a tie, the file
+                                          with smallest lexicographical order will be consumed first. In case of ``random`` any
+                                          file will be picked randomly. When using ``oldest`` and ``youngest`` the whole
+                                          directory will be scanned to pick the oldest/youngest file, which might be slow if there
+                                          are a large number of files, while using ``random`` may cause old files to be consumed
+                                          very late if new files keep coming in the spooling directory.
+pollDelay                 500             Delay (in milliseconds) used when polling for new files.
+recursiveDirectorySearch  false           Whether to monitor sub directories for new files to read.
+maxBackoff                4000            The maximum time (in millis) to wait between consecutive attempts to
+                                          write to the channel(s) if the channel is full. The source will start at
+                                          a low backoff and increase it exponentially each time the channel throws a
+                                          ChannelException, upto the value specified by this parameter.
+batchSize                 100             Granularity at which to batch transfer to the channel
+inputCharset              UTF-8           Character set used by deserializers that treat the input file as text.
+decodeErrorPolicy         ``FAIL``        What to do when we see a non-decodable character in the input file.
+                                          ``FAIL``: Throw an exception and fail to parse the file.
+                                          ``REPLACE``: Replace the unparseable character with the "replacement character" char,
+                                          typically Unicode U+FFFD.
+                                          ``IGNORE``: Drop the unparseable character sequence.
+deserializer              ``LINE``        Specify the deserializer used to parse the file into events.
+                                          Defaults to parsing each line as an event. The class specified must implement
+                                          ``EventDeserializer.Builder``.
+deserializer.*                            Varies per event deserializer.
+bufferMaxLines            --              (Obselete) This option is now ignored.
+bufferMaxLineLength       5000            (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.
+selector.type             replicating     replicating or multiplexing
+selector.*                                Depends on the selector.type value
+interceptors              --              Space-separated list of interceptors
 interceptors.*
-=======================  ==============  ==========================================================
+========================  ==============  ==========================================================
 
 Example for an agent named agent-1: