You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2016/02/09 02:41:58 UTC

flume git commit: FLUME-2875. Allow RollingFileSink to specify a file prefix and a file extension.

Repository: flume
Updated Branches:
  refs/heads/trunk af63d38fa -> 7962ce63b


FLUME-2875. Allow RollingFileSink to specify a file prefix and a file extension.

(Ralph Goers via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/7962ce63
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/7962ce63
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/7962ce63

Branch: refs/heads/trunk
Commit: 7962ce63bc1cfdfed6b54b8b211e785ccab350f5
Parents: af63d38
Author: Hari Shreedharan <hs...@apache.org>
Authored: Mon Feb 8 17:40:35 2016 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Mon Feb 8 17:40:35 2016 -0800

----------------------------------------------------------------------
 .../formatter/output/DefaultPathManager.java    | 108 +++++++++++++++++++
 .../flume/formatter/output/PathManager.java     |  63 ++++-------
 .../formatter/output/PathManagerFactory.java    |  82 ++++++++++++++
 .../flume/formatter/output/PathManagerType.java |  43 ++++++++
 .../formatter/output/RollTimePathManager.java   |  66 ++++++++++++
 .../org/apache/flume/sink/RollingFileSink.java  |   8 +-
 .../apache/flume/sink/TestRollingFileSink.java  | 106 ++++++++++++++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  53 ++++-----
 8 files changed, 460 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/7962ce63/flume-ng-core/src/main/java/org/apache/flume/formatter/output/DefaultPathManager.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/DefaultPathManager.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/DefaultPathManager.java
new file mode 100644
index 0000000..176db7f
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/DefaultPathManager.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.formatter.output;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flume.Context;
+
+public class DefaultPathManager implements PathManager {
+
+  private long seriesTimestamp;
+  private File baseDirectory;
+  private AtomicInteger fileIndex;
+  private String filePrefix;
+  private String extension;
+
+  private static final String DEFAULT_FILE_PREFIX = "";
+  private static final String DEFAULT_FILE_EXTENSION = "";
+  private static final String FILE_EXTENSION = "extension";
+  private static final String FILE_PREFIX = "prefix";
+
+  protected File currentFile;
+
+  public DefaultPathManager(Context context) {
+    filePrefix = context.getString(FILE_PREFIX, DEFAULT_FILE_PREFIX);
+    extension = context.getString(FILE_EXTENSION, DEFAULT_FILE_EXTENSION);
+    seriesTimestamp = System.currentTimeMillis();
+    fileIndex = new AtomicInteger();
+  }
+
+  @Override
+  public File nextFile() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(filePrefix).append(seriesTimestamp).append("-");
+    sb.append(fileIndex.incrementAndGet());
+    if (extension.length() > 0) {
+        sb.append(".").append(extension);
+    }
+    currentFile = new File(baseDirectory, sb.toString());
+
+    return currentFile;
+  }
+
+  @Override
+  public File getCurrentFile() {
+    if (currentFile == null) {
+      return nextFile();
+    }
+
+    return currentFile;
+  }
+
+  @Override
+  public void rotate() {
+    currentFile = null;
+  }
+
+  @Override
+  public File getBaseDirectory() {
+    return baseDirectory;
+  }
+
+  @Override
+  public void setBaseDirectory(File baseDirectory) {
+    this.baseDirectory = baseDirectory;
+  }
+
+  public long getSeriesTimestamp() {
+    return seriesTimestamp;
+  }
+
+  public String getPrefix() {
+    return filePrefix;
+  }
+
+  public String getExtension() {
+    return extension;
+  }
+
+  public AtomicInteger getFileIndex() {
+    return fileIndex;
+  }
+
+  public static class Builder implements PathManager.Builder {
+    @Override
+    public PathManager build(Context context) {
+      return new DefaultPathManager(context);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/7962ce63/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManager.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManager.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManager.java
index 933cc94..5a3066a 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManager.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManager.java
@@ -16,58 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.flume.formatter.output;
 
 import java.io.File;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flume.Context;
 
-public class PathManager {
+/**
+ * Creates the files used by the RollingFileSink.
+ */
+public interface PathManager {
+    /**
+     * {@link Context} prefix
+     */
+    public static String CTX_PREFIX = "pathManager.";
 
-  private long seriesTimestamp;
-  private File baseDirectory;
-  private AtomicInteger fileIndex;
+    File nextFile();
 
-  private File currentFile;
+    File getCurrentFile();
 
-  public PathManager() {
-    seriesTimestamp = System.currentTimeMillis();
-    fileIndex = new AtomicInteger();
-  }
+    void rotate();
 
-  public File nextFile() {
-    currentFile = new File(baseDirectory, seriesTimestamp + "-"
-        + fileIndex.incrementAndGet());
+    File getBaseDirectory();
 
-    return currentFile;
-  }
+    void setBaseDirectory(File baseDirectory);
 
-  public File getCurrentFile() {
-    if (currentFile == null) {
-      return nextFile();
+    /**
+     * Knows how to construct this path manager.<br/>
+     * <b>Note: Implementations MUST provide a public a no-arg constructor.</b>
+     */
+    public interface Builder {
+        public PathManager build(Context context);
     }
-
-    return currentFile;
-  }
-
-  public void rotate() {
-    currentFile = null;
-  }
-
-  public File getBaseDirectory() {
-    return baseDirectory;
-  }
-
-  public void setBaseDirectory(File baseDirectory) {
-    this.baseDirectory = baseDirectory;
-  }
-
-  public long getSeriesTimestamp() {
-    return seriesTimestamp;
-  }
-
-  public AtomicInteger getFileIndex() {
-    return fileIndex;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/7962ce63/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerFactory.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerFactory.java
new file mode 100644
index 0000000..4dbe083
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.formatter.output;
+
+import com.google.common.base.Preconditions;
+import java.util.Locale;
+import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create PathManager instances.
+ */
+public class PathManagerFactory {
+    private static final Logger logger = LoggerFactory.getLogger(PathManagerFactory.class);
+
+    public static PathManager getInstance(String managerType, Context context) {
+
+        Preconditions.checkNotNull(managerType, "path manager type must not be null");
+
+        // try to find builder class in enum of known output serializers
+        PathManagerType type;
+        try {
+            type = PathManagerType.valueOf(managerType.toUpperCase(Locale.ENGLISH));
+        } catch (IllegalArgumentException e) {
+            logger.debug("Not in enum, loading builder class: {}", managerType);
+            type = PathManagerType.OTHER;
+        }
+        Class<? extends PathManager.Builder> builderClass = type.getBuilderClass();
+
+        // handle the case where they have specified their own builder in the config
+        if (builderClass == null) {
+            try {
+                Class c = Class.forName(managerType);
+                if (c != null && PathManager.Builder.class.isAssignableFrom(c)) {
+                    builderClass = (Class<? extends PathManager.Builder>) c;
+                } else {
+                    String errMessage = "Unable to instantiate Builder from " +
+                            managerType + ": does not appear to implement " +
+                            PathManager.Builder.class.getName();
+                    throw new FlumeException(errMessage);
+                }
+            } catch (ClassNotFoundException ex) {
+                logger.error("Class not found: " + managerType, ex);
+                throw new FlumeException(ex);
+            }
+        }
+
+        // build the builder
+        PathManager.Builder builder;
+        try {
+            builder = builderClass.newInstance();
+        } catch (InstantiationException ex) {
+            String errMessage = "Cannot instantiate builder: " + managerType;
+            logger.error(errMessage, ex);
+            throw new FlumeException(errMessage, ex);
+        } catch (IllegalAccessException ex) {
+            String errMessage = "Cannot instantiate builder: " + managerType;
+            logger.error(errMessage, ex);
+            throw new FlumeException(errMessage, ex);
+        }
+
+        return builder.build(context);
+    }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/7962ce63/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerType.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerType.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerType.java
new file mode 100644
index 0000000..4f1fa93
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.formatter.output;
+
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+
+/**
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public enum PathManagerType {
+    DEFAULT(DefaultPathManager.Builder.class),
+    ROLLTIME(RollTimePathManager.Builder.class),
+    OTHER(null);
+
+    private final Class<? extends PathManager.Builder> builderClass;
+
+    PathManagerType(Class<? extends PathManager.Builder> builderClass) {
+        this.builderClass = builderClass;
+    }
+
+    public Class<? extends PathManager.Builder> getBuilderClass() {
+        return builderClass;
+    }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/7962ce63/flume-ng-core/src/main/java/org/apache/flume/formatter/output/RollTimePathManager.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/RollTimePathManager.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/RollTimePathManager.java
new file mode 100644
index 0000000..6883a9c
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/RollTimePathManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.formatter.output;
+
+import java.io.File;
+
+import org.apache.flume.Context;
+import org.joda.time.LocalDateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ *
+ */
+public class RollTimePathManager extends DefaultPathManager {
+
+    private final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyyMMddHHmmss");
+    private String lastRoll;
+
+    public RollTimePathManager(Context context) {
+        super(context);
+    }
+
+    @Override
+    public File nextFile() {
+        StringBuilder sb = new StringBuilder();
+        String date = formatter.print(LocalDateTime.now());
+        if (!date.equals(lastRoll)) {
+            getFileIndex().set(0);
+            lastRoll = date;
+        }
+        sb.append(getPrefix()).append(date).append("-");
+        sb.append(getFileIndex().incrementAndGet());
+        if (getExtension().length() > 0) {
+            sb.append(".").append(getExtension());
+        }
+        currentFile = new File(getBaseDirectory(), sb.toString());
+
+        return currentFile;
+    }
+
+    public static class Builder implements PathManager.Builder {
+        @Override
+        public PathManager build(Context context) {
+            return new RollTimePathManager(context);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/7962ce63/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
index 9cb3370..b97d404 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
@@ -33,6 +33,7 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.formatter.output.PathManager;
+import org.apache.flume.formatter.output.PathManagerFactory;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,13 +67,13 @@ public class RollingFileSink extends AbstractSink implements Configurable {
   private volatile boolean shouldRotate;
 
   public RollingFileSink() {
-    pathController = new PathManager();
     shouldRotate = false;
   }
 
   @Override
   public void configure(Context context) {
 
+    String pathManagerType = context.getString("sink.pathManager", "DEFAULT");
     String directory = context.getString("sink.directory");
     String rollInterval = context.getString("sink.rollInterval");
 
@@ -81,6 +82,11 @@ public class RollingFileSink extends AbstractSink implements Configurable {
         new Context(context.getSubProperties("sink." +
             EventSerializer.CTX_PREFIX));
 
+    Context pathManagerContext =
+              new Context(context.getSubProperties("sink." +
+                      PathManager.CTX_PREFIX));
+    pathController = PathManagerFactory.getInstance(pathManagerType, pathManagerContext);
+
     Preconditions.checkArgument(directory != null, "Directory may not be null");
     Preconditions.checkNotNull(serializerType, "Serializer type is undefined");
 

http://git-wip-us.apache.org/repos/asf/flume/blob/7962ce63/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
index 07fa644..bf4ed1f 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
@@ -172,4 +172,110 @@ public class TestRollingFileSink {
       reader.close();
     }
   }
+
+  @Test
+  public void testAppend3() throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
+    File tmpDir = new File("target/tmpLog");
+    tmpDir.mkdirs();
+    cleanDirectory(tmpDir);
+    Context context = new Context();
+
+    context.put("sink.directory", "target/tmpLog");
+    context.put("sink.rollInterval", "0");
+    context.put("sink.batchSize", "1");
+    context.put("sink.pathManager.prefix", "test3-");
+    context.put("sink.pathManager.extension", "txt");
+
+    Configurables.configure(sink, context);
+
+    Channel channel = new PseudoTxnMemoryChannel();
+    Configurables.configure(channel, context);
+
+    sink.setChannel(channel);
+    sink.start();
+
+    for (int i = 0; i < 10; i++) {
+      Event event = new SimpleEvent();
+
+      event.setBody(("Test event " + i).getBytes());
+
+      channel.put(event);
+      sink.process();
+
+      Thread.sleep(500);
+    }
+
+    sink.stop();
+
+    for (String file : sink.getDirectory().list()) {
+      BufferedReader reader = new BufferedReader(new FileReader(new File(sink.getDirectory(), file)));
+
+      String lastLine = null;
+      String currentLine = null;
+
+      while ((currentLine = reader.readLine()) != null) {
+        lastLine = currentLine;
+        logger.debug("Produced file:{} lastLine:{}", file, lastLine);
+      }
+
+      reader.close();
+    }
+  }
+
+  @Test
+  public void testRollTime() throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
+    File tmpDir = new File("target/tempLog");
+    tmpDir.mkdirs();
+    cleanDirectory(tmpDir);
+    Context context = new Context();
+
+    context.put("sink.directory", "target/tempLog/");
+    context.put("sink.rollInterval", "1");
+    context.put("sink.batchSize", "1");
+    context.put("sink.pathManager", "rolltime");
+    context.put("sink.pathManager.prefix", "test4-");
+    context.put("sink.pathManager.extension", "txt");
+
+    Configurables.configure(sink, context);
+
+    Channel channel = new PseudoTxnMemoryChannel();
+    Configurables.configure(channel, context);
+
+    sink.setChannel(channel);
+    sink.start();
+
+    for (int i = 0; i < 10; i++) {
+      Event event = new SimpleEvent();
+
+      event.setBody(("Test event " + i).getBytes());
+
+      channel.put(event);
+      sink.process();
+
+      Thread.sleep(500);
+    }
+
+    sink.stop();
+
+    for (String file : sink.getDirectory().list()) {
+      BufferedReader reader = new BufferedReader(new FileReader(new File(sink.getDirectory(), file)));
+
+      String lastLine = null;
+      String currentLine = null;
+
+      while ((currentLine = reader.readLine()) != null) {
+        lastLine = currentLine;
+        logger.debug("Produced file:{} lastLine:{}", file, lastLine);
+      }
+
+      reader.close();
+    }
+  }
+
+  private void cleanDirectory(File dir) {
+    File[] files = dir.listFiles();
+    for (File file : files) {
+      file.delete();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/7962ce63/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 0f8461d..423e0cf 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -28,22 +28,22 @@ Apache Flume is a distributed, reliable, and available system for efficiently
 collecting, aggregating and moving large amounts of log data from many
 different sources to a centralized data store.
 
-The use of Apache Flume is not only restricted to log data aggregation. 
+The use of Apache Flume is not only restricted to log data aggregation.
 Since data sources are customizable, Flume can be used to transport massive quantities
-of event data including but not limited to network traffic data, social-media-generated data, 
+of event data including but not limited to network traffic data, social-media-generated data,
 email messages and pretty much any data source possible.
 
 Apache Flume is a top level project at the Apache Software Foundation.
 
 There are currently two release code lines available, versions 0.9.x and 1.x.
 
-Documentation for the 0.9.x track is available at 
+Documentation for the 0.9.x track is available at
 `the Flume 0.9.x User Guide <http://archive.cloudera.com/cdh/3/flume/UserGuide/>`_.
 
 This documentation applies to the 1.4.x track.
 
-New and existing users are encouraged to use the 1.x releases so as to 
-leverage the performance improvements and configuration flexibilities available 
+New and existing users are encouraged to use the 1.x releases so as to
+leverage the performance improvements and configuration flexibilities available
 in the latest architecture.
 
 
@@ -1153,7 +1153,7 @@ Twitter 1% firehose Source (experimental)
 
 Experimental source that connects via Streaming API to the 1% sample twitter
 firehose, continously downloads tweets, converts them to Avro format and
-sends Avro events to a downstream Flume sink. Requires the consumer and 
+sends Avro events to a downstream Flume sink. Requires the consumer and
 access tokens and secrets of a Twitter developer account.
 Required properties are in **bold**.
 
@@ -1165,7 +1165,7 @@ Property Name          Default      Description
 **consumerKey**        --           OAuth consumer key
 **consumerSecret**     --           OAuth consumer secret
 **accessToken**        --           OAuth access token
-**accessTokenSecret**  --           OAuth toekn secret 
+**accessTokenSecret**  --           OAuth toekn secret
 maxBatchSize           1000         Maximum number of twitter messages to put in a single batch
 maxBatchDurationMillis 1000         Maximum number of milliseconds to wait before closing a batch
 ====================== ===========  ===================================================
@@ -2119,16 +2119,19 @@ File Roll Sink
 Stores events on the local filesystem.
 Required properties are in **bold**.
 
-===================  =======  ======================================================================================================================
-Property Name        Default  Description
-===================  =======  ======================================================================================================================
-**channel**          --
-**type**             --       The component type name, needs to be ``file_roll``.
-**sink.directory**   --       The directory where files will be stored
-sink.rollInterval    30       Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file.
-sink.serializer      TEXT     Other possible options include ``avro_event`` or the FQCN of an implementation of EventSerializer.Builder interface.
-batchSize            100
-===================  =======  ======================================================================================================================
+==========================  =======  ======================================================================================================================
+Property Name               Default  Description
+==========================  =======  ======================================================================================================================
+**channel**                 --
+**type**                    --       The component type name, needs to be ``file_roll``.
+**sink.directory**          --       The directory where files will be stored
+sink.pathManager            DEFAULT  The PathManager implementation to use.
+sink.pathManager.extension  --       The file extension if the default PathManager is used.
+sink.pathManager.prefix     --       A character string to add to the beginning of the file name if the default PathManager is used
+sink.rollInterval           30       Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file.
+sink.serializer             TEXT     Other possible options include ``avro_event`` or the FQCN of an implementation of EventSerializer.Builder interface.
+batchSize                   100
+==========================  =======  ======================================================================================================================
 
 Example for agent named a1:
 
@@ -2284,19 +2287,19 @@ This sink extracts data from Flume events, transforms it, and loads it in near-r
 
 This sink is well suited for use cases that stream raw data into HDFS (via the HdfsSink) and simultaneously extract, transform and load the same data into Solr (via MorphlineSolrSink). In particular, this sink can process arbitrary heterogeneous raw data from disparate data sources and turn it into a data model that is useful to Search applications.
 
-The ETL functionality is customizable using a `morphline configuration file <http://cloudera.github.io/cdk/docs/current/cdk-morphlines/index.html>`_ that defines a chain of transformation commands that pipe event records from one command to another. 
+The ETL functionality is customizable using a `morphline configuration file <http://cloudera.github.io/cdk/docs/current/cdk-morphlines/index.html>`_ that defines a chain of transformation commands that pipe event records from one command to another.
 
 Morphlines can be seen as an evolution of Unix pipelines where the data model is generalized to work with streams of generic records, including arbitrary binary payloads. A morphline command is a bit like a Flume Interceptor. Morphlines can be embedded into Hadoop components such as Flume.
 
 Commands to parse and transform a set of standard data formats such as log files, Avro, CSV, Text, HTML, XML, PDF, Word, Excel, etc. are provided out of the box, and additional custom commands and parsers for additional data formats can be added as morphline plugins. Any kind of data format can be indexed and any Solr documents for any kind of Solr schema can be generated, and any custom ETL logic can be registered and executed.
 
-Morphlines manipulate continuous streams of records. The data model can be described as follows: A record is a set of named fields where each field has an ordered list of one or more values. A value can be any Java Object. That is, a record is essentially a hash table where each hash table entry contains a String key and a list of Java Objects as values. (The implementation uses Guava's ``ArrayListMultimap``, which is a ``ListMultimap``). Note that a field can have multiple values and any two records need not use common field names. 
+Morphlines manipulate continuous streams of records. The data model can be described as follows: A record is a set of named fields where each field has an ordered list of one or more values. A value can be any Java Object. That is, a record is essentially a hash table where each hash table entry contains a String key and a list of Java Objects as values. (The implementation uses Guava's ``ArrayListMultimap``, which is a ``ListMultimap``). Note that a field can have multiple values and any two records need not use common field names.
 
 This sink fills the body of the Flume event into the ``_attachment_body`` field of the morphline record, as well as copies the headers of the Flume event into record fields of the same name. The commands can then act on this data.
 
 Routing to a SolrCloud cluster is supported to improve scalability. Indexing load can be spread across a large number of MorphlineSolrSinks for improved scalability. Indexing load can be replicated across multiple MorphlineSolrSinks for high availability, for example using Flume features such as Load balancing Sink Processor. MorphlineInterceptor can also help to implement dynamic routing to multiple Solr collections (e.g. for multi-tenancy).
 
-The morphline and solr jars required for your environment must be placed in the lib directory of the Apache Flume installation. 
+The morphline and solr jars required for your environment must be placed in the lib directory of the Apache Flume installation.
 
 The type is the FQCN: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
 
@@ -2334,11 +2337,11 @@ ElasticSearchSink
 ~~~~~~~~~~~~~~~~~
 
 This sink writes data to an elasticsearch cluster. By default, events will be written so that the `Kibana <http://kibana.org>`_ graphical interface
-can display them - just as if `logstash <https://logstash.net>`_ wrote them. 
+can display them - just as if `logstash <https://logstash.net>`_ wrote them.
 
-The elasticsearch and lucene-core jars required for your environment must be placed in the lib directory of the Apache Flume installation. 
+The elasticsearch and lucene-core jars required for your environment must be placed in the lib directory of the Apache Flume installation.
 Elasticsearch requires that the major version of the client JAR match that of the server and that both are running the same minor version
-of the JVM. SerializationExceptions will appear if this is incorrect. To 
+of the JVM. SerializationExceptions will appear if this is incorrect. To
 select the required version first determine the version of elasticsearch and the JVM version the target cluster is running. Then select an elasticsearch client
 library which matches the major version. A 0.19.x client can talk to a 0.19.x cluster; 0.20.x can talk to 0.20.x and 0.90.x can talk to 0.90.x. Once the
 elasticsearch version has been determined then read the pom.xml file to determine the correct lucene-core JAR version to use. The Flume agent
@@ -2588,7 +2591,7 @@ Example for agent named a1:
   a1.channels.c1.transactionCapacity = 10000
   a1.channels.c1.byteCapacityBufferPercentage = 20
   a1.channels.c1.byteCapacity = 800000
-  
+
 
 JDBC Channel
 ~~~~~~~~~~~~
@@ -2796,7 +2799,7 @@ The disk store is managed using an embedded File channel. When the in-memory que
 the file channel. This channel is ideal for flows that need high throughput of memory channel during normal operation, but at the
 same time need the larger capacity of the file channel for better tolerance of intermittent sink side outages or drop in drain rates.
 The throughput will reduce approximately to file channel speeds during such abnormal situations. In case of an agent crash or restart,
-only the events stored on disk are recovered when the agent comes online. **This channel is currently experimental and 
+only the events stored on disk are recovered when the agent comes online. **This channel is currently experimental and
 not recommended for use in production.**
 
 Required properties are in **bold**. Please refer to file channel for additional required properties.