You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by rg...@apache.org on 2022/03/27 16:55:30 UTC

[flume] 01/09: FLUME-3413 3412 - Add LoadBalancingChannelSelector and Initializable

This is an automated email from the ASF dual-hosted git repository.

rgoers pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git

commit abba25ac7f1a0be5669a780dadc0078903ad8793
Author: Ralph Goers <rg...@apache.org>
AuthorDate: Sat Mar 26 12:48:21 2022 -0700

    FLUME-3413 3412 - Add LoadBalancingChannelSelector and Initializable
---
 .mvn/local-settings.xml                            |   7 -
 .../flume/conf/channel/ChannelSelectorType.java    |   5 +
 .../channel/LoadBalancingChannelSelector.java      | 152 +++++++++++++++++++++
 .../flume/sink/AbstractSingleSinkProcessor.java    |  65 +++++++++
 .../apache/flume/sink/AbstractSinkProcessor.java   |   2 +-
 .../apache/flume/sink/DefaultSinkProcessor.java    |  39 +-----
 .../main/java/org/apache/flume/sink/NullSink.java  |   8 ++
 .../channel/TestLoadBalancingChannelSelector.java  |  72 ++++++++++
 flume-ng-doc/sphinx/FlumeDeveloperGuide.rst        |  90 ++++++++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |  23 ++++
 flume-ng-node/pom.xml                              |  12 ++
 .../java/org/apache/flume/node/Application.java    |  36 +++++
 .../java/org/apache/flume/node/Initializable.java  |  30 ++++
 .../org/apache/flume/node/TestApplication.java     |  67 +++++++++
 .../java/org/apache/flume/sink/NullInitSink.java   |  98 ++++++-------
 .../org/apache/flume/source/EventProcessor.java    |  33 +++++
 .../java/org/apache/flume/source/LocalSource.java  |  89 ++++++++++++
 .../src/test/resources/flume-conf-init.properties  |  51 +++++++
 flume-ng-node/src/test/resources/log4j2.xml        |   4 +-
 19 files changed, 778 insertions(+), 105 deletions(-)

diff --git a/.mvn/local-settings.xml b/.mvn/local-settings.xml
index 6bf134e..55d11d3 100644
--- a/.mvn/local-settings.xml
+++ b/.mvn/local-settings.xml
@@ -7,49 +7,42 @@
       <mirrorOf>wso2</mirrorOf>
       <name>wso2-unblocked</name>
       <url>http://dist.wso2.org/maven2/</url>
-      <blocked>false</blocked>
     </mirror>
     <mirror>
       <id>maven-twttr-unblocked</id>
       <mirrorOf>maven-twttr</mirrorOf>
       <name>maven-twttr-unblocked</name>
       <url>http://maven.twttr.com</url>
-      <blocked>false</blocked>
     </mirror>
     <mirror>
       <id>maven-restlet-unblocked</id>
       <mirrorOf>maven-restlet</mirrorOf>
       <name>maven-restlet-unblocked</name>
       <url>http://maven.restlet.org</url>
-      <blocked>false</blocked>
     </mirror>
     <mirror>
       <id>datanucleus-unblocked</id>
       <mirrorOf>datanucleus</mirrorOf>
       <name>datanucleus-unblocked</name>
       <url>http://www.datanucleus.org/downloads/maven2</url>
-      <blocked>false</blocked>
     </mirror>
     <mirror>
       <id>glassfish-repository-unblocked</id>
       <mirrorOf>glassfish-repository</mirrorOf>
       <name>glassfish-repository-unblocked</name>
       <url>http://maven.glassfish.org/content/groups/glassfish</url>
-      <blocked>false</blocked>
     </mirror>
     <mirror>
       <id>glassfish-repo-archive-unblocked</id>
       <mirrorOf>glassfish-repo-archive</mirrorOf>
       <name>glassfish-repo-archive-unblocked</name>
       <url>http://maven.glassfish.org/content/groups/glassfish</url>
-      <blocked>false</blocked>
     </mirror>
     <mirror>
       <id>conjars-unblocked</id>
       <mirrorOf>conjars</mirrorOf>
       <name>conjars-unblocked</name>
       <url>http://conjars.org/repo</url>
-      <blocked>false</blocked>
     </mirror>
   </mirrors>
 </settings>
\ No newline at end of file
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorType.java
index d1a0dc2..e329ec3 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorType.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/channel/ChannelSelectorType.java
@@ -34,6 +34,11 @@ public enum ChannelSelectorType implements ComponentWithClassName {
   REPLICATING("org.apache.flume.channel.ReplicatingChannelSelector"),
 
   /**
+   * Load balancing channel selector.
+   */
+  LOAD_BALANCING("org.apache.flume.channel.LoadBalancingChannelSelector"),
+
+  /**
    * Multiplexing channel selector.
    */
   MULTIPLEXING("org.apache.flume.channel.MultiplexingChannelSelector");
diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/LoadBalancingChannelSelector.java b/flume-ng-core/src/main/java/org/apache/flume/channel/LoadBalancingChannelSelector.java
new file mode 100644
index 0000000..0b080b2
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/channel/LoadBalancingChannelSelector.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Load balancing channel selector. This selector allows for load balancing
+ * between channels based on various policy configuration options. This serves a similar purpose
+ * to the LoadBalancingSinkProcessor except it allows the sinks to run in their own threads instead
+ * of in just one.
+ *
+ * <p>The <tt>LoadBalancingChannelSelector</tt> maintains an indexed list of
+ * active channels on which the load must be distributed. This implementation
+ * supports distributing load using either via <tt>ROUND_ROBIN</tt> or via
+ * <tt>RANDOM</tt> selection mechanism. The choice of selection mechanism
+ * defaults to <tt>ROUND_ROBIN</tt> type, but can be overridden via
+ * configuration.</p>
+ */
+public class LoadBalancingChannelSelector extends AbstractChannelSelector {
+  private final List<Channel> emptyList = Collections.emptyList();
+  private ChannelPicker picker;
+
+  @Override
+  public List<Channel> getRequiredChannels(Event event) {
+    Channel ch = picker.getChannel();
+    Preconditions.checkNotNull(ch, "Channel picker returned null");
+    return Lists.newArrayList(ch);
+  }
+
+  @Override
+  public List<Channel> getOptionalChannels(Event event) {
+    return emptyList;
+  }
+
+  @Override
+  public void configure(Context context) {
+    List<Channel> channels = getAllChannels();
+    String strPolicy = context.getString("policy", Policy.ROUND_ROBIN.toString());
+    Policy policy;
+    // instantiate policy
+    try {
+      policy = Policy.valueOf(strPolicy.toUpperCase());
+    } catch (IllegalArgumentException ex) {
+      throw new IllegalArgumentException("Invalid policy: " + strPolicy, ex);
+    }
+    // instantiate picker
+    try {
+      picker = policy.getPolicyClass().newInstance();
+      picker.setChannels(channels);
+    } catch (InstantiationException | IllegalAccessException ex) {
+      throw new IllegalArgumentException("Cannot instantiate policy class from policy enum "
+          + policy, ex);
+    }
+  }
+
+  /**
+   * Definitions for the various policy types
+   */
+  private enum Policy {
+    ROUND_ROBIN(RoundRobinPolicy.class),
+    RANDOM(RandomPolicy.class);
+
+    private final Class<? extends ChannelPicker> clazz;
+
+    Policy(Class<? extends ChannelPicker> clazz) {
+      this.clazz = clazz;
+    }
+
+    public Class<? extends ChannelPicker> getPolicyClass() {
+      return clazz;
+    }
+  }
+
+  private interface ChannelPicker {
+    Channel getChannel();
+
+    void setChannels(List<Channel> channels);
+  }
+
+  /**
+   * Selects channels in a round-robin fashion
+   */
+  private static class RoundRobinPolicy implements ChannelPicker {
+
+    private final AtomicInteger next = new AtomicInteger(0);
+    private List<Channel> channels;
+
+    public RoundRobinPolicy() {
+    }
+
+    @Override
+    public void setChannels(List<Channel> channels) {
+      this.channels = channels;
+    }
+
+    @Override
+    public Channel getChannel() {
+      return channels.get(next.getAndAccumulate(channels.size(), (x, y) -> ++x < y ? x : 0));
+    }
+  }
+
+  /**
+   * Selects a channel at random
+   */
+  private static class RandomPolicy implements ChannelPicker {
+    private List<Channel> channels;
+    private final Random random = new Random(System.currentTimeMillis());
+
+    public RandomPolicy() {
+    }
+
+    @Override
+    public void setChannels(List<Channel> channels) {
+      this.channels = channels;
+    }
+
+    @Override
+    public Channel getChannel() {
+      int size = channels.size();
+      int pick = random.nextInt(size);
+      return channels.get(pick);
+    }
+
+  }
+}
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSingleSinkProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSingleSinkProcessor.java
new file mode 100644
index 0000000..6b1019b
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSingleSinkProcessor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.sink;
+
+import java.util.List;
+
+import org.apache.flume.Sink;
+import org.apache.flume.SinkProcessor;
+import org.apache.flume.lifecycle.LifecycleState;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A Sink Processor that only accesses a single Sink.
+ */
+public abstract class AbstractSingleSinkProcessor implements SinkProcessor {
+  protected Sink sink;
+  private LifecycleState lifecycleState;
+
+  @Override
+  public void start() {
+    Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
+    sink.start();
+    lifecycleState = LifecycleState.START;
+  }
+
+  @Override
+  public void stop() {
+    Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
+    sink.stop();
+    lifecycleState = LifecycleState.STOP;
+  }
+
+  @Override
+  public LifecycleState getLifecycleState() {
+    return lifecycleState;
+  }
+
+  @Override
+  public void setSinks(List<Sink> sinks) {
+    Preconditions.checkNotNull(sinks);
+    Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can "
+        + "only handle one sink, "
+        + "try using a policy that supports multiple sinks");
+    sink = sinks.get(0);
+  }
+
+  public Sink getSink() {
+    return sink;
+  }
+}
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java
index 3de653a..a8398fe 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java
@@ -64,7 +64,7 @@ public abstract class AbstractSinkProcessor implements SinkProcessor {
     sinkList = Collections.unmodifiableList(list);
   }
 
-  protected List<Sink> getSinks() {
+  public List<Sink> getSinks() {
     return sinkList;
   }
 }
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java
index 2da9264..b11bab7 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkProcessor.java
@@ -17,46 +17,18 @@
  */
 package org.apache.flume.sink;
 
-import java.util.List;
-
 import org.apache.flume.Context;
 import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Sink;
 import org.apache.flume.Sink.Status;
-import org.apache.flume.SinkProcessor;
 import org.apache.flume.conf.ComponentConfiguration;
 import org.apache.flume.conf.ConfigurableComponent;
-import org.apache.flume.lifecycle.LifecycleState;
-
-import com.google.common.base.Preconditions;
 
 /**
  * Default sink processor that only accepts a single sink, passing on process
  * results without any additional handling. Suitable for all sinks that aren't
  * assigned to a group.
  */
-public class DefaultSinkProcessor implements SinkProcessor, ConfigurableComponent {
-  private Sink sink;
-  private LifecycleState lifecycleState;
-
-  @Override
-  public void start() {
-    Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
-    sink.start();
-    lifecycleState = LifecycleState.START;
-  }
-
-  @Override
-  public void stop() {
-    Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
-    sink.stop();
-    lifecycleState = LifecycleState.STOP;
-  }
-
-  @Override
-  public LifecycleState getLifecycleState() {
-    return lifecycleState;
-  }
+public class DefaultSinkProcessor extends AbstractSingleSinkProcessor implements ConfigurableComponent {
 
   @Override
   public void configure(Context context) {
@@ -68,15 +40,6 @@ public class DefaultSinkProcessor implements SinkProcessor, ConfigurableComponen
   }
 
   @Override
-  public void setSinks(List<Sink> sinks) {
-    Preconditions.checkNotNull(sinks);
-    Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can "
-        + "only handle one sink, "
-        + "try using a policy that supports multiple sinks");
-    sink = sinks.get(0);
-  }
-
-  @Override
   public void configure(ComponentConfiguration conf) {
 
   }
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
index 9347f8e..9ca2b8f 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
@@ -138,6 +138,14 @@ public class NullSink extends AbstractSink implements Configurable, BatchSizeSup
     return "NullSink " + getName() + " { batchSize: " + batchSize + " }";
   }
 
+  public CounterGroup getCounterGroup() {
+    return counterGroup;
+  }
+
+  public int getLogEveryNEvents() {
+    return logEveryNEvents;
+  }
+
   @Override
   public long getBatchSize() {
     return batchSize;
diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestLoadBalancingChannelSelector.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestLoadBalancingChannelSelector.java
new file mode 100644
index 0000000..e0e9e79
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestLoadBalancingChannelSelector.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.conf.BasicConfigurationConstants;
+import org.apache.flume.conf.channel.ChannelSelectorType;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class TestLoadBalancingChannelSelector {
+
+  private List<Channel> channels = new ArrayList<Channel>();
+
+  private ChannelSelector selector;
+
+  @Before
+  public void setUp() throws Exception {
+    channels.clear();
+    channels.add(MockChannel.createMockChannel("ch1"));
+    channels.add(MockChannel.createMockChannel("ch2"));
+    channels.add(MockChannel.createMockChannel("ch3"));
+    channels.add(MockChannel.createMockChannel("ch4"));
+    Map<String, String> config = new HashMap<>();
+    config.put(BasicConfigurationConstants.CONFIG_TYPE, ChannelSelectorType.LOAD_BALANCING.name());
+    selector = ChannelSelectorFactory.create(channels, config);
+  }
+
+  @Test
+  public void testLoadBalancingSelector() throws Exception {
+    selector.configure(new Context());
+    validateChannel(selector, "ch1");
+    validateChannel(selector, "ch2");
+    validateChannel(selector, "ch3");
+    validateChannel(selector, "ch4");
+
+    List<Channel> optCh = selector.getOptionalChannels(new MockEvent());
+    Assert.assertEquals(0, optCh.size());
+  }
+
+  private void validateChannel(ChannelSelector selector, String channelName) {
+    List<Channel> channels = selector.getRequiredChannels(new MockEvent());
+    Assert.assertNotNull(channels);
+    Assert.assertEquals(1, channels.size());
+    Assert.assertEquals(channelName, channels.get(0).getName());
+  }
+}
diff --git a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
index de1e92a..f58da81 100644
--- a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
@@ -866,3 +866,93 @@ Channel
 ~~~~~~~
 
 TBD
+
+Initializable
+~~~~~~~~~~~~~
+
+As of Flume 1.10.0 Sources, Sinks, and Channels may implement the Intitializable interface. Doing so
+allows the component to have access the materialized configuration before any of the components have been
+started.
+
+This example shows a Sink being configured with the name of a Source. While initializing it will
+retrieve the Source from the configuration and save it. During event processing a new event will be
+sent to the Source, presumably after the event has be modified in some way.
+
+.. code-block:: java
+
+  public class NullInitSink extends NullSink implements Initializable {
+
+    private static final Logger logger = LoggerFactory.getLogger(NullInitSink.class);
+    private String sourceName = null;
+    private EventProcessor eventProcessor = null;
+    private long total = 0;
+
+    public NullInitSink() {
+      super();
+    }
+
+    @Override
+    public void configure(Context context) {
+      sourceName = context.getString("targetSource");
+      super.configure(context);
+
+    }
+
+    @Override
+    public void initialize(MaterializedConfiguration configuration) {
+      logger.debug("Locating source for event publishing");
+      for (Map.Entry<String, SourceRunner>  entry : configuration.getSourceRunners().entrySet()) {
+        if (entry.getKey().equals(sourceName)) {
+          Source source = entry.getValue().getSource();
+          if (source instanceof EventProcessor) {
+            eventProcessor = (EventProcessor) source;
+            logger.debug("Found event processor {}", source.getName());
+            return;
+          }
+        }
+      }
+      logger.warn("No Source named {} found for republishing events.", sourceName);
+    }
+
+    @Override
+    public Status process() throws EventDeliveryException {
+      Status status = Status.READY;
+
+      Channel channel = getChannel();
+      Transaction transaction = channel.getTransaction();
+      Event event = null;
+      CounterGroup counterGroup = getCounterGroup();
+      long batchSize = getBatchSize();
+      long eventCounter = counterGroup.get("events.success");
+
+      try {
+        transaction.begin();
+        int i = 0;
+        for (i = 0; i < batchSize; i++) {
+          event = channel.take();
+          if (event != null) {
+            long id = Long.parseLong(new String(event.getBody()));
+            total += id;
+            event.getHeaders().put("Total", Long.toString(total));
+            eventProcessor.processEvent(event);
+            logger.info("Null sink {} successful processed event {}", getName(), id);
+          } else {
+            status = Status.BACKOFF;
+            break;
+          }
+        }
+        transaction.commit();
+        counterGroup.addAndGet("events.success", (long) Math.min(batchSize, i));
+        counterGroup.incrementAndGet("transaction.success");
+      } catch (Exception ex) {
+        transaction.rollback();
+        counterGroup.incrementAndGet("transaction.failed");
+        logger.error("Failed to deliver event. Exception follows.", ex);
+        throw new EventDeliveryException("Failed to deliver event: " + event, ex);
+      } finally {
+        transaction.close();
+      }
+
+      return status;
+    }
+  }
\ No newline at end of file
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 315e5c4..eaa73b7 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -4121,6 +4121,29 @@ In the above configuration, c3 is an optional channel. Failure to write to c3 is
 simply ignored. Since c1 and c2 are not marked optional, failure to write to
 those channels will cause the transaction to fail.
 
+Load Balancing Channel Selector
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+Load balancing channel selector provides the ability to load-balance flow over multiple channels. This
+effectively allows the incoming data to be processed on multiple threads. It maintains an indexed list of active channels on which the load must be distributed. Implementation supports distributing load using either via round_robin or random selection mechanisms. The choice of selection mechanism defaults to round_robin type, but can be overridden via configuration.
+
+Required properties are in **bold**.
+
+==================  =====================  =================================================
+Property Name       Default                Description
+==================  =====================  =================================================
+selector.type       replicating            The component type name, needs to be ``load_balancing``
+selector.policy     ``round_robin``        Selection mechanism. Must be either ``round_robin`` or ``random``.
+==================  =====================  =================================================
+
+Example for agent named a1 and it's source called r1:
+
+.. code-block:: properties
+
+  a1.sources = r1
+  a1.channels = c1 c2 c3 c4
+  a1.sources.r1.channels = c1 c2 c3 c4
+  a1.sources.r1.selector.type = load_balancing
+  a1.sources.r1.selector.policy = round_robin
 
 Multiplexing Channel Selector
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/flume-ng-node/pom.xml b/flume-ng-node/pom.xml
index cb6979a..17c8456 100644
--- a/flume-ng-node/pom.xml
+++ b/flume-ng-node/pom.xml
@@ -86,6 +86,18 @@
 
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-1.2-api</artifactId>
       <scope>test</scope>
     </dependency>
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
index 1f4df59..e30ede8 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
@@ -43,7 +43,10 @@ import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
+import org.apache.flume.Sink;
+import org.apache.flume.SinkProcessor;
 import org.apache.flume.SinkRunner;
+import org.apache.flume.Source;
 import org.apache.flume.SourceRunner;
 import org.apache.flume.instrumentation.MonitorService;
 import org.apache.flume.instrumentation.MonitoringType;
@@ -53,6 +56,8 @@ import org.apache.flume.lifecycle.LifecycleSupervisor;
 import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
 import org.apache.flume.node.net.AuthorizationProvider;
 import org.apache.flume.node.net.BasicAuthorizationProvider;
+import org.apache.flume.sink.AbstractSingleSinkProcessor;
+import org.apache.flume.sink.AbstractSinkProcessor;
 import org.apache.flume.util.SSLUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,6 +109,7 @@ public class Application {
     try {
       lifecycleLock.lockInterruptibly();
       stopAllComponents();
+      initializeAllComponents(conf);
       startAllComponents(conf);
     } catch (InterruptedException e) {
       logger.info("Interrupted while trying to handle configuration event");
@@ -167,6 +173,36 @@ public class Application {
     }
   }
 
+  private void initializeAllComponents(MaterializedConfiguration materializedConfiguration) {
+    logger.info("Initializing components");
+    for (Channel ch : materializedConfiguration.getChannels().values()) {
+      while (ch.getLifecycleState() != LifecycleState.START && ch instanceof Initializable) {
+        ((Initializable) ch).initialize(materializedConfiguration);
+      }
+    }
+    for (SinkRunner sinkRunner : materializedConfiguration.getSinkRunners().values()) {
+      SinkProcessor processor = sinkRunner.getPolicy();
+      if (processor instanceof AbstractSingleSinkProcessor) {
+        Sink sink = ((AbstractSingleSinkProcessor) processor).getSink();
+        if (sink instanceof Initializable) {
+          ((Initializable) sink).initialize(materializedConfiguration);
+        }
+      } else if (processor instanceof AbstractSinkProcessor) {
+        for (Sink sink : ((AbstractSinkProcessor) processor).getSinks()) {
+          if (sink instanceof Initializable) {
+            ((Initializable) sink).initialize(materializedConfiguration);
+          }
+        }
+      }
+    }
+    for (SourceRunner sourceRunner : materializedConfiguration.getSourceRunners().values()) {
+      Source source = sourceRunner.getSource();
+      if (source instanceof Initializable) {
+        ((Initializable) source).initialize(materializedConfiguration);
+      }
+    }
+  }
+
   private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
     logger.info("Starting new configuration:{}", materializedConfiguration);
 
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Initializable.java b/flume-ng-node/src/main/java/org/apache/flume/node/Initializable.java
new file mode 100644
index 0000000..114990e
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/Initializable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.flume.node;
+
+/**
+ * An interface implmmented by components that need access after all components have been created but before
+ * any have been started.
+ */
+public interface Initializable {
+
+  /**
+   * Called to initialize the component.
+   * @param configuration the materialized configuration.
+   */
+  void initialize(MaterializedConfiguration configuration);
+}
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
index 3853d50..00d15c5 100644
--- a/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
@@ -22,17 +22,25 @@ package org.apache.flume.node;
 import static org.mockito.Mockito.*;
 
 import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flume.Channel;
+import org.apache.flume.Event;
 import org.apache.flume.SinkRunner;
 import org.apache.flume.SourceRunner;
+import org.apache.flume.Transaction;
+import org.apache.flume.event.SimpleEvent;
 import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.source.EventProcessor;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -195,4 +203,63 @@ public class TestApplication {
     Thread.sleep(1500L);
     application.stop();
   }
+
+  @Test
+  public void testFlumeInit() throws Exception {
+    File configFile = new File(baseDir, "flume-conf-init.properties");
+    Files.copy(new File(getClass().getClassLoader()
+            .getResource("flume-conf-init.properties").getFile()), configFile);
+    ConfigurationSource source = new FileConfigurationSource(configFile.toURI());
+    List<ConfigurationSource> sourceList = new ArrayList<>();
+    sourceList.add(source);
+    UriConfigurationProvider configurationProvider =
+        new UriConfigurationProvider("host1", sourceList, null, null, 1);
+    List<LifecycleAware> components = Lists.newArrayList();
+    Application application = new Application(components);
+    MaterializedConfiguration configuration = configurationProvider.getConfiguration();
+    Assert.assertNotNull("Unable to create configuration", configuration);
+    application.handleConfigurationEvent(configuration);
+    application.start();
+    Map<String, Channel> channels = configuration.getChannels();
+    Channel channel = channels.get("processedChannel");
+    Assert.assertNotNull("Channel not found", channel);
+    Map<String, SourceRunner> sourceRunners = configuration.getSourceRunners();
+    Assert.assertNotNull("No source runners", sourceRunners);
+    SourceRunner runner = sourceRunners.get("source1");
+    Assert.assertNotNull("No source runner", runner);
+    EventProcessor processor = (EventProcessor) runner.getSource();
+    long[] expected = new long[]{1, 3, 6, 10, 15};
+    for (int i = 0; i < 5; ++i) {
+      Event event = new SimpleEvent();
+      event.setBody(Long.toString(i + 1).getBytes(StandardCharsets.UTF_8));
+      processor.processEvent(event);
+    }
+    Thread.sleep(500);
+    for (int i = 0; i < 5; ++i) {
+      Event event = getEvent(channel);
+      Assert.assertNotNull("No event returned on iteration " + i, event);
+      String val = event.getHeaders().get("Total");
+      Assert.assertNotNull("No Total in event " + i, val);
+      long total = Long.parseLong(val);
+      Assert.assertEquals(expected[i], total);
+    }
+    application.stop();
+  }
+
+  private Event getEvent(Channel channel) {
+    Transaction transaction = channel.getTransaction();
+    Event event = null;
+
+    try {
+      transaction.begin();
+      event = channel.take();
+      transaction.commit();
+    } catch (Exception ex) {
+      transaction.rollback();
+      Assert.fail("Failed to retrieve Flume Event: " + ex.getMessage());
+    } finally {
+      transaction.close();
+    }
+    return event;
+  }
 }
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java b/flume-ng-node/src/test/java/org/apache/flume/sink/NullInitSink.java
similarity index 58%
copy from flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
copy to flume-ng-node/src/test/java/org/apache/flume/sink/NullInitSink.java
index 9347f8e..00296e6 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
+++ b/flume-ng-node/src/test/java/org/apache/flume/sink/NullInitSink.java
@@ -17,16 +17,20 @@
  */
 package org.apache.flume.sink;
 
-import com.google.common.base.Preconditions;
+import java.util.Map;
+
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Sink;
+import org.apache.flume.Source;
+import org.apache.flume.SourceRunner;
 import org.apache.flume.Transaction;
-import org.apache.flume.conf.BatchSizeSupported;
-import org.apache.flume.conf.Configurable;
+import org.apache.flume.node.Initializable;
+import org.apache.flume.node.MaterializedConfiguration;
+import org.apache.flume.source.EventProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,32 +52,38 @@ import org.slf4j.LoggerFactory;
  * TODO
  * </p>
  */
-public class NullSink extends AbstractSink implements Configurable, BatchSizeSupported {
-
-  private static final Logger logger = LoggerFactory.getLogger(NullSink.class);
+public class NullInitSink extends NullSink implements Initializable {
 
-  private static final int DFLT_BATCH_SIZE = 100;
-  private static final int DFLT_LOG_EVERY_N_EVENTS = 10000;
+  private static final Logger logger = LoggerFactory.getLogger(NullInitSink.class);
+  private String sourceName = null;
+  private EventProcessor eventProcessor = null;
+  private long total = 0;
 
-  private CounterGroup counterGroup;
-  private int batchSize = DFLT_BATCH_SIZE;
-  private int logEveryNEvents = DFLT_LOG_EVERY_N_EVENTS;
-
-  public NullSink() {
-    counterGroup = new CounterGroup();
+  public NullInitSink() {
+    super();
   }
 
   @Override
   public void configure(Context context) {
-    batchSize = context.getInteger("batchSize", DFLT_BATCH_SIZE);
-    logger.debug(this.getName() + " " +
-        "batch size set to " + String.valueOf(batchSize));
-    Preconditions.checkArgument(batchSize > 0, "Batch size must be > 0");
+    sourceName = context.getString("targetSource");
+    super.configure(context);
 
-    logEveryNEvents = context.getInteger("logEveryNEvents", DFLT_LOG_EVERY_N_EVENTS);
-    logger.debug(this.getName() + " " +
-        "log event N events set to " + logEveryNEvents);
-    Preconditions.checkArgument(logEveryNEvents > 0, "logEveryNEvents must be > 0");
+  }
+
+  @Override
+  public void initialize(MaterializedConfiguration configuration) {
+    logger.debug("Locating source for event publishing");
+    for (Map.Entry<String, SourceRunner>  entry : configuration.getSourceRunners().entrySet()) {
+      if (entry.getKey().equals(sourceName)) {
+        Source source = entry.getValue().getSource();
+        if (source instanceof EventProcessor) {
+          eventProcessor = (EventProcessor) source;
+          logger.debug("Found event processor {}", source.getName());
+          return;
+        }
+      }
+    }
+    logger.warn("No Source named {} found for republishing events.", sourceName);
   }
 
   @Override
@@ -83,6 +93,8 @@ public class NullSink extends AbstractSink implements Configurable, BatchSizeSup
     Channel channel = getChannel();
     Transaction transaction = channel.getTransaction();
     Event event = null;
+    CounterGroup counterGroup = getCounterGroup();
+    long batchSize = getBatchSize();
     long eventCounter = counterGroup.get("events.success");
 
     try {
@@ -90,10 +102,13 @@ public class NullSink extends AbstractSink implements Configurable, BatchSizeSup
       int i = 0;
       for (i = 0; i < batchSize; i++) {
         event = channel.take();
-        if (++eventCounter % logEveryNEvents == 0) {
-          logger.info("Null sink {} successful processed {} events.", getName(), eventCounter);
-        }
-        if (event == null) {
+        if (event != null) {
+          long id = Long.parseLong(new String(event.getBody()));
+          total += id;
+          event.getHeaders().put("Total", Long.toString(total));
+          eventProcessor.processEvent(event);
+          logger.info("Null sink {} successful processed event {}", getName(), id);
+        } else {
           status = Status.BACKOFF;
           break;
         }
@@ -112,35 +127,4 @@ public class NullSink extends AbstractSink implements Configurable, BatchSizeSup
 
     return status;
   }
-
-  @Override
-  public void start() {
-    logger.info("Starting {}...", this);
-
-    counterGroup.setName(this.getName());
-    super.start();
-
-    logger.info("Null sink {} started.", getName());
-  }
-
-  @Override
-  public void stop() {
-    logger.info("Null sink {} stopping...", getName());
-
-    super.stop();
-
-    logger.info("Null sink {} stopped. Event metrics: {}",
-        getName(), counterGroup);
-  }
-
-  @Override
-  public String toString() {
-    return "NullSink " + getName() + " { batchSize: " + batchSize + " }";
-  }
-
-  @Override
-  public long getBatchSize() {
-    return batchSize;
-  }
-
 }
diff --git a/flume-ng-node/src/test/java/org/apache/flume/source/EventProcessor.java b/flume-ng-node/src/test/java/org/apache/flume/source/EventProcessor.java
new file mode 100644
index 0000000..677065f
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/source/EventProcessor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+import org.apache.flume.Event;
+
+/**
+ * Interface indicating processEvent is implemented.
+ */
+public interface EventProcessor {
+  /**
+   * When implemented causes the event to be handled by the component.
+   * @param event The Flume event.
+   */
+  default void processEvent(Event event) {
+  }
+}
diff --git a/flume-ng-node/src/test/java/org/apache/flume/source/LocalSource.java b/flume-ng-node/src/test/java/org/apache/flume/source/LocalSource.java
new file mode 100644
index 0000000..18f609a
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/source/LocalSource.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SourceCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Accepts an event from a local component and publishes it to a channel.
+ */
+public class LocalSource extends AbstractSource implements Configurable, EventDrivenSource, EventProcessor {
+
+  private static final Logger logger = LoggerFactory.getLogger(LocalSource.class);
+
+  private SourceCounter sourceCounter;
+
+  /**
+   * Called by flume to start this source.
+   */
+  @Override
+  public void start() {
+    logger.info("Local source {} starting.", getName());
+    sourceCounter.start();
+    super.start();
+  }
+
+  /**
+   * Called by flume to stop this source.
+   */
+  @Override
+  public void stop() {
+    logger.info("Local source {} stopping.", getName());
+    sourceCounter.stop();
+    super.stop();
+    logger.info("Local source {} stopped. Metrics: {}", getName(), sourceCounter);
+  }
+
+  /**
+   * A message is passed in here. It is data that should be passed on.
+   *
+   * @param event The message.
+   */
+  public void processEvent(Event event) {
+    if (event == null) {
+      //Ignoring this.  Not counting as an event received either.
+      return;
+    }
+
+    sourceCounter.incrementAppendReceivedCount();
+    sourceCounter.incrementEventReceivedCount();
+    logger.debug("pushing event to channel");
+    getChannelProcessor().processEvent(event);
+    sourceCounter.incrementAppendAcceptedCount();
+    sourceCounter.incrementEventAcceptedCount();
+  }
+
+  /**
+   * Called when flume starts up.
+   *
+   * @param context - Config values for this source from flume properties file.
+   */
+  @Override
+  public void configure(Context context) {
+    if (sourceCounter == null) {
+      sourceCounter = new SourceCounter(getName());
+    }
+  }
+}
diff --git a/flume-ng-node/src/test/resources/flume-conf-init.properties b/flume-ng-node/src/test/resources/flume-conf-init.properties
new file mode 100644
index 0000000..9ac5727
--- /dev/null
+++ b/flume-ng-node/src/test/resources/flume-conf-init.properties
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Flume Configuration
+# This file contains configuration for one Agent identified as host1.
+# This file also contains invalid configuration for few agents
+# host2, host3 etc.
+#
+
+host1.sources = source1 processedSource
+host1.channels = channel1 processedChannel
+host1.sinks = sink1
+
+# avroSource configuration
+host1.sources.source1.type = org.apache.flume.source.LocalSource
+host1.sources.source1.channels = channel1
+host1.sources.source1.totalEvents = 5
+host1.sources.source1.backoffSleepIncrement = 50
+
+host1.sources.processedSource.type = org.apache.flume.source.LocalSource
+host1.sources.processedSource.channels = processedChannel
+
+# memChannel1 configuration
+host1.channels.channel1.type = memory
+host1.channels.channel1.capacity = 10000
+
+host1.channels.processedChannel.type = memory
+host1.channels.processedChannel.capacity = 10000
+
+host1.sinks.sink1.type = org.apache.flume.sink.NullInitSink
+host1.sinks.sink1.batchSize = 1
+host1.sinks.sink1.targetSource = processedSource
+host1.sinks.sink1.channel = channel1
+
+
diff --git a/flume-ng-node/src/test/resources/log4j2.xml b/flume-ng-node/src/test/resources/log4j2.xml
index fbc44b4..732fef9 100644
--- a/flume-ng-node/src/test/resources/log4j2.xml
+++ b/flume-ng-node/src/test/resources/log4j2.xml
@@ -16,7 +16,7 @@
  limitations under the License.
 
 -->
-<Configuration status="OFF">
+<Configuration status="ERROR">
   <Appenders>
     <Console name="Console" target="SYSTEM_OUT">
       <PatternLayout pattern="%d (%t) [%p - %l] %m%n" />
@@ -24,7 +24,7 @@
   </Appenders>
   <Loggers>
     <Logger name="org.apache.flume" level="DEBUG"/>
-    <Root level="INFO">
+    <Root level="DEBUG">
       <AppenderRef ref="Console" />
     </Root>
   </Loggers>