You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/07/19 21:57:16 UTC
[1/3] flume git commit: FLUME-2810. Add static Schema URL to
AvroEventSerializer configuration
Repository: flume
Updated Branches:
refs/heads/trunk c7de4ba5c -> 9965dae7b
FLUME-2810. Add static Schema URL to AvroEventSerializer configuration
Currently the only way to pass a schema to the avro event serializer is
via header. This introduces an option to specify the location directly
in the Flume configuration.
(Jeff Holoman via Mike Percy)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/dbf2e989
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/dbf2e989
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/dbf2e989
Branch: refs/heads/trunk
Commit: dbf2e989744a6b2921076be017102f75323a69f4
Parents: c7de4ba
Author: Jeff Holoman <je...@gmail.com>
Authored: Tue Jul 19 12:29:08 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Jul 19 14:43:33 2016 -0700
----------------------------------------------------------------------
...roEventSerializerConfigurationConstants.java | 5 ++
flume-ng-doc/sphinx/FlumeUserGuide.rst | 51 ++++++++++++++++++--
.../flume/sink/hdfs/AvroEventSerializer.java | 45 +++++++++++------
.../sink/hdfs/TestAvroEventSerializer.java | 47 ++++++++++++++----
4 files changed, 117 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/dbf2e989/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java
index cce6716..470fcea 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventSerializerConfigurationConstants.java
@@ -35,4 +35,9 @@ public class AvroEventSerializerConfigurationConstants {
public static final String COMPRESSION_CODEC = "compressionCodec";
public static final String DEFAULT_COMPRESSION_CODEC = "null"; // no codec
+ /**
+ * Avro static Schema URL
+ */
+ public static final String STATIC_SCHEMA_URL = "schemaURL";
+ public static final String DEFAULT_STATIC_SCHEMA_URL = null;
}
http://git-wip-us.apache.org/repos/asf/flume/blob/dbf2e989/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index f9ca1b2..3937514 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -3235,19 +3235,59 @@ Example for agent named a1:
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false
+"Flume Event" Avro Event Serializer
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Alias: ``avro_event``.
+
+This interceptor serializes Flume events into an Avro container file. The schema used is the same schema used for
+Flume events in the Avro RPC mechanism.
+
+This serializer inherits from the ``AbstractAvroEventSerializer`` class.
+
+Configuration options are as follows:
+
+========================== ================ ===========================================================================
+Property Name Default Description
+========================== ================ ===========================================================================
+syncIntervalBytes 2048000 Avro sync interval, in approximate bytes.
+compressionCodec null Avro compression codec. For supported codecs, see Avro's CodecFactory docs.
+========================== ================ ===========================================================================
+
+Example for agent named a1:
+
+.. code-block:: properties
+
+ a1.sinks.k1.type = hdfs
+ a1.sinks.k1.channel = c1
+ a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
+ a1.sinks.k1.serializer = avro_event
+ a1.sinks.k1.serializer.compressionCodec = snappy
+
Avro Event Serializer
~~~~~~~~~~~~~~~~~~~~~
-Alias: ``avro_event``. This interceptor serializes Flume events into an Avro
-container file. The schema used is the same schema used for Flume events
-in the Avro RPC mechanism. This serializers inherits from the
-``AbstractAvroEventSerializer`` class. Configuration options are as follows:
+Alias: This serializer does not have an alias, and must be specified using the fully-qualified class name class name.
+
+This serializes Flume events into an Avro container file like the "Flume Event" Avro Event Serializer, however the
+record schema is configurable. The record schema may be specified either as a Flume configuration property or passed in an event header.
+
+To pass the record schema as part of the Flume configuration, use the property ``schemaURL`` as listed below.
+
+To pass the record schema in an event header, specify either the event header ``flume.avro.schema.literal``
+containing a JSON-format representation of the schema or ``flume.avro.schema.url`` with a URL where
+the schema may be found (``hdfs:/...`` URIs are supported).
+
+This serializer inherits from the ``AbstractAvroEventSerializer`` class.
+
+Configuration options are as follows:
========================== ================ ===========================================================================
Property Name Default Description
========================== ================ ===========================================================================
syncIntervalBytes 2048000 Avro sync interval, in approximate bytes.
compressionCodec null Avro compression codec. For supported codecs, see Avro's CodecFactory docs.
+schemaURL null Avro schema URL. Schemas specified in the header ovverride this option.
========================== ================ ===========================================================================
Example for agent named a1:
@@ -3257,8 +3297,9 @@ Example for agent named a1:
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
- a1.sinks.k1.serializer = avro_event
+ a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.compressionCodec = snappy
+ a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc
Flume Interceptors
http://git-wip-us.apache.org/repos/asf/flume/blob/dbf2e989/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
index fea6218..3231742 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
@@ -18,14 +18,6 @@
*/
package org.apache.flume.sink.hdfs;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
@@ -44,7 +36,21 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.*;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.COMPRESSION_CODEC;
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_COMPRESSION_CODEC;
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_STATIC_SCHEMA_URL;
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_SYNC_INTERVAL_BYTES;
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.STATIC_SCHEMA_URL;
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.SYNC_INTERVAL_BYTES;
/**
* <p>
@@ -76,6 +82,7 @@ public class AvroEventSerializer implements EventSerializer, Configurable {
private int syncIntervalBytes;
private String compressionCodec;
private Map<String, Schema> schemaCache = new HashMap<String, Schema>();
+ private String staticSchemaURL;
private AvroEventSerializer(OutputStream out) {
this.out = out;
@@ -87,6 +94,7 @@ public class AvroEventSerializer implements EventSerializer, Configurable {
context.getInteger(SYNC_INTERVAL_BYTES, DEFAULT_SYNC_INTERVAL_BYTES);
compressionCodec =
context.getString(COMPRESSION_CODEC, DEFAULT_COMPRESSION_CODEC);
+ staticSchemaURL = context.getString(STATIC_SCHEMA_URL, DEFAULT_STATIC_SCHEMA_URL);
}
@Override
@@ -111,19 +119,24 @@ public class AvroEventSerializer implements EventSerializer, Configurable {
private void initialize(Event event) throws IOException {
Schema schema = null;
String schemaUrl = event.getHeaders().get(AVRO_SCHEMA_URL_HEADER);
- if (schemaUrl != null) {
+ String schemaString = event.getHeaders().get(AVRO_SCHEMA_LITERAL_HEADER);
+
+ if (schemaUrl != null) { // if URL_HEADER is there then use it
schema = schemaCache.get(schemaUrl);
if (schema == null) {
schema = loadFromUrl(schemaUrl);
schemaCache.put(schemaUrl, schema);
}
- }
- if (schema == null) {
- String schemaString = event.getHeaders().get(AVRO_SCHEMA_LITERAL_HEADER);
- if (schemaString == null) {
- throw new FlumeException("Could not find schema for event " + event);
- }
+ } else if (schemaString != null) { // fallback to LITERAL_HEADER if it was there
schema = new Schema.Parser().parse(schemaString);
+ } else if (staticSchemaURL != null) { // fallback to static url if it was there
+ schema = schemaCache.get(staticSchemaURL);
+ if (schema == null) {
+ schema = loadFromUrl(staticSchemaURL);
+ schemaCache.put(staticSchemaURL, schema);
+ }
+ } else { // no other options so giving up
+ throw new FlumeException("Could not find schema for event " + event);
}
writer = new GenericDatumWriter<Object>(schema);
http://git-wip-us.apache.org/repos/asf/flume/blob/dbf2e989/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java
index 38af74d..6b38da2 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java
@@ -39,10 +39,12 @@ import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
+import org.apache.flume.serialization.AvroEventSerializerConfigurationConstants;
import org.apache.flume.serialization.EventSerializer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.After;
public class TestAvroEventSerializer {
@@ -53,39 +55,55 @@ public class TestAvroEventSerializer {
file = File.createTempFile(getClass().getSimpleName(), "");
}
+ @After
+ public void tearDown() throws Exception {
+ file.delete();
+ }
+
@Test
public void testNoCompression() throws IOException {
- createAvroFile(file, null, false);
+ createAvroFile(file, null, false, false);
validateAvroFile(file);
}
@Test
public void testNullCompression() throws IOException {
- createAvroFile(file, "null", false);
+ createAvroFile(file, "null", false, false);
validateAvroFile(file);
}
@Test
public void testDeflateCompression() throws IOException {
- createAvroFile(file, "deflate", false);
+ createAvroFile(file, "deflate", false, false);
validateAvroFile(file);
}
@Test
public void testSnappyCompression() throws IOException {
- createAvroFile(file, "snappy", false);
+ createAvroFile(file, "snappy", false, false);
validateAvroFile(file);
}
@Test
public void testSchemaUrl() throws IOException {
- createAvroFile(file, null, true);
+ createAvroFile(file, null, true, false);
validateAvroFile(file);
}
- public void createAvroFile(File file, String codec, boolean useSchemaUrl) throws
- IOException {
+ @Test
+ public void testStaticSchemaUrl() throws IOException {
+ createAvroFile(file,null,false, true);
+ validateAvroFile(file);
+ }
+ @Test
+ public void testBothUrls() throws IOException {
+ createAvroFile(file,null,true,true);
+ validateAvroFile(file);
+ }
+
+ public void createAvroFile(File file, String codec, boolean useSchemaUrl,
+ boolean useStaticSchemaUrl) throws IOException {
// serialize a few events using the reflection-based avro serializer
OutputStream out = new FileOutputStream(file);
@@ -100,11 +118,16 @@ public class TestAvroEventSerializer {
}));
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
File schemaFile = null;
- if (useSchemaUrl) {
+ if (useSchemaUrl || useStaticSchemaUrl) {
schemaFile = File.createTempFile(getClass().getSimpleName(), ".avsc");
Files.write(schema.toString(), schemaFile, Charsets.UTF_8);
}
+ if (useStaticSchemaUrl) {
+ ctx.put(AvroEventSerializerConfigurationConstants.STATIC_SCHEMA_URL,
+ schemaFile.toURI().toURL().toExternalForm());
+ }
+
EventSerializer.Builder builder = new AvroEventSerializer.Builder();
EventSerializer serializer = builder.build(ctx, out);
@@ -112,10 +135,10 @@ public class TestAvroEventSerializer {
for (int i = 0; i < 3; i++) {
GenericRecord record = recordBuilder.set("message", "Hello " + i).build();
Event event = EventBuilder.withBody(serializeAvro(record, schema));
- if (schemaFile == null) {
+ if (schemaFile == null && !useSchemaUrl) {
event.getHeaders().put(AvroEventSerializer.AVRO_SCHEMA_LITERAL_HEADER,
schema.toString());
- } else {
+ } else if (useSchemaUrl) {
event.getHeaders().put(AvroEventSerializer.AVRO_SCHEMA_URL_HEADER,
schemaFile.toURI().toURL().toExternalForm());
}
@@ -125,6 +148,10 @@ public class TestAvroEventSerializer {
serializer.beforeClose();
out.flush();
out.close();
+ if (schemaFile != null ) {
+ schemaFile.delete();
+ }
+
}
private byte[] serializeAvro(Object datum, Schema schema) throws IOException {
[2/3] flume git commit: Fix broken link in README
Posted by mp...@apache.org.
Fix broken link in README
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0259d302
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0259d302
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0259d302
Branch: refs/heads/trunk
Commit: 0259d302730f94b99c1a84f8311d1abc0f635eb3
Parents: dbf2e98
Author: Mike Percy <mp...@apache.org>
Authored: Tue Jul 19 12:42:10 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Jul 19 14:43:40 2016 -0700
----------------------------------------------------------------------
README.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/0259d302/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 7198a65..9ebb2a3 100644
--- a/README.md
+++ b/README.md
@@ -39,7 +39,7 @@ In source form, it can be found in the flume-ng-doc directory.
The Flume 1.x guide and FAQ are available here:
-* https://cwiki.apache.org/FLUME/flume-ng.html
+* https://cwiki.apache.org/FLUME
* https://cwiki.apache.org/confluence/display/FLUME/Getting+Started
## Contact us!
[3/3] flume git commit: Fix sphinx layout errors
Posted by mp...@apache.org.
Fix sphinx layout errors
Minor syntax error fixes
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/9965dae7
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/9965dae7
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/9965dae7
Branch: refs/heads/trunk
Commit: 9965dae7bd384abd2e25fd1756dbe516e8acdb9b
Parents: 0259d30
Author: Mike Percy <mp...@apache.org>
Authored: Tue Jul 19 13:33:58 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Jul 19 14:43:40 2016 -0700
----------------------------------------------------------------------
flume-ng-doc/sphinx/FlumeUserGuide.rst | 92 ++++++++++++++---------------
1 file changed, 46 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/9965dae7/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 3937514..d8bfebf 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -976,53 +976,53 @@ Despite the reliability guarantees of this source, there are still
cases in which events may be duplicated if certain downstream failures occur.
This is consistent with the guarantees offered by other Flume components.
-====================== ============== ==========================================================
-Property Name Default Description
-====================== ============== ==========================================================
-**channels** --
-**type** -- The component type name, needs to be ``spooldir``.
-**spoolDir** -- The directory from which to read files from.
-fileSuffix .COMPLETED Suffix to append to completely ingested files
-deletePolicy never When to delete completed files: ``never`` or ``immediate``
-fileHeader false Whether to add a header storing the absolute path filename.
-fileHeaderKey file Header key to use when appending absolute path filename to event header.
-basenameHeader false Whether to add a header storing the basename of the file.
-basenameHeaderKey basename Header Key to use when appending basename of file to event header.
-ignorePattern ^$ Regular expression specifying which files to ignore (skip)
-trackerDir .flumespool Directory to store metadata related to processing of files.
- If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
-consumeOrder oldest In which order files in the spooling directory will be consumed ``oldest``,
- ``youngest`` and ``random``. In case of ``oldest`` and ``youngest``, the last modified
- time of the files will be used to compare the files. In case of a tie, the file
- with smallest lexicographical order will be consumed first. In case of ``random`` any
- file will be picked randomly. When using ``oldest`` and ``youngest`` the whole
- directory will be scanned to pick the oldest/youngest file, which might be slow if there
- are a large number of files, while using ``random`` may cause old files to be consumed
- very late if new files keep coming in the spooling directory.
-pollDelay 500 Delay (in milliseconds) used when polling for new files.
-recursiveDirectorySearch false Whether to monitor sub directories for new files to read.
-maxBackoff 4000 The maximum time (in millis) to wait between consecutive attempts to
- write to the channel(s) if the channel is full. The source will start at
- a low backoff and increase it exponentially each time the channel throws a
- ChannelException, upto the value specified by this parameter.
-batchSize 100 Granularity at which to batch transfer to the channel
-inputCharset UTF-8 Character set used by deserializers that treat the input file as text.
-decodeErrorPolicy ``FAIL`` What to do when we see a non-decodable character in the input file.
- ``FAIL``: Throw an exception and fail to parse the file.
- ``REPLACE``: Replace the unparseable character with the "replacement character" char,
- typically Unicode U+FFFD.
- ``IGNORE``: Drop the unparseable character sequence.
-deserializer ``LINE`` Specify the deserializer used to parse the file into events.
- Defaults to parsing each line as an event. The class specified must implement
- ``EventDeserializer.Builder``.
-deserializer.* Varies per event deserializer.
-bufferMaxLines -- (Obselete) This option is now ignored.
-bufferMaxLineLength 5000 (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.
-selector.type replicating replicating or multiplexing
-selector.* Depends on the selector.type value
-interceptors -- Space-separated list of interceptors
+======================== ============== ==========================================================
+Property Name Default Description
+======================== ============== ==========================================================
+**channels** --
+**type** -- The component type name, needs to be ``spooldir``.
+**spoolDir** -- The directory from which to read files from.
+fileSuffix .COMPLETED Suffix to append to completely ingested files
+deletePolicy never When to delete completed files: ``never`` or ``immediate``
+fileHeader false Whether to add a header storing the absolute path filename.
+fileHeaderKey file Header key to use when appending absolute path filename to event header.
+basenameHeader false Whether to add a header storing the basename of the file.
+basenameHeaderKey basename Header Key to use when appending basename of file to event header.
+ignorePattern ^$ Regular expression specifying which files to ignore (skip)
+trackerDir .flumespool Directory to store metadata related to processing of files.
+ If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
+consumeOrder oldest In which order files in the spooling directory will be consumed ``oldest``,
+ ``youngest`` and ``random``. In case of ``oldest`` and ``youngest``, the last modified
+ time of the files will be used to compare the files. In case of a tie, the file
+ with smallest lexicographical order will be consumed first. In case of ``random`` any
+ file will be picked randomly. When using ``oldest`` and ``youngest`` the whole
+ directory will be scanned to pick the oldest/youngest file, which might be slow if there
+ are a large number of files, while using ``random`` may cause old files to be consumed
+ very late if new files keep coming in the spooling directory.
+pollDelay 500 Delay (in milliseconds) used when polling for new files.
+recursiveDirectorySearch false Whether to monitor sub directories for new files to read.
+maxBackoff 4000 The maximum time (in millis) to wait between consecutive attempts to
+ write to the channel(s) if the channel is full. The source will start at
+ a low backoff and increase it exponentially each time the channel throws a
+ ChannelException, upto the value specified by this parameter.
+batchSize 100 Granularity at which to batch transfer to the channel
+inputCharset UTF-8 Character set used by deserializers that treat the input file as text.
+decodeErrorPolicy ``FAIL`` What to do when we see a non-decodable character in the input file.
+ ``FAIL``: Throw an exception and fail to parse the file.
+ ``REPLACE``: Replace the unparseable character with the "replacement character" char,
+ typically Unicode U+FFFD.
+ ``IGNORE``: Drop the unparseable character sequence.
+deserializer ``LINE`` Specify the deserializer used to parse the file into events.
+ Defaults to parsing each line as an event. The class specified must implement
+ ``EventDeserializer.Builder``.
+deserializer.* Varies per event deserializer.
+bufferMaxLines -- (Obselete) This option is now ignored.
+bufferMaxLineLength 5000 (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.
+selector.type replicating replicating or multiplexing
+selector.* Depends on the selector.type value
+interceptors -- Space-separated list of interceptors
interceptors.*
-======================= ============== ==========================================================
+======================== ============== ==========================================================
Example for an agent named agent-1: