You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2012/05/18 21:42:43 UTC

svn commit: r1340222 - in /incubator/flume/trunk: flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/ flume-ng-core/src/main/java/org/apache/flume/sink/ flume-ng-core/src/test/java/org/apache/flume/sink/

Author: brock
Date: Fri May 18 19:42:43 2012
New Revision: 1340222

URL: http://svn.apache.org/viewvc?rev=1340222&view=rev
Log:
FLUME-1198: Implement a load balancing sink processor

(Arvind Prabhakar via Brock Noland)

Added:
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/FixedOrderSelector.java
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
Modified:
    incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/SinkProcessorFactoryTest.java

Modified: incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java?rev=1340222&r1=1340221&r2=1340222&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java (original)
+++ incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java Fri May 18 19:42:43 2012
@@ -27,16 +27,25 @@ public enum SinkProcessorType {
   /**
    * Failover processor
    *
-   * @see FailoverSinkProcessor
+   * @see org.apache.flume.sink.FailoverSinkProcessor
    */
   FAILOVER("org.apache.flume.sink.FailoverSinkProcessor"),
 
   /**
    * Standard processor
    *
-   * @see DefaultSinkProcessor
+   * @see org.apache.flume.sink.DefaultSinkProcessor
    */
-  DEFAULT("org.apache.flume.sink.DefaultSinkProcessor");
+  DEFAULT("org.apache.flume.sink.DefaultSinkProcessor"),
+
+
+  /**
+   * Load balancing processor
+   *
+   * @see org.apache.flume.sink.LoadBalancingSinkProcessor
+   */
+  LOAD_BALANCE("org.apache.flume.sink.LoadBalancingSinkProcessor");
+
   private final String processorClassName;
 
   private SinkProcessorType(String processorClassName) {

Added: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java?rev=1340222&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java Fri May 18 19:42:43 2012
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flume.Sink;
+import org.apache.flume.SinkProcessor;
+import org.apache.flume.lifecycle.LifecycleState;
+
+/**
+ * A convenience base class for sink processors.
+ */
+public abstract class AbstractSinkProcessor implements SinkProcessor {
+
+  private LifecycleState state;
+
+  // List of sinks as specified
+  private List<Sink> sinkList;
+
+  @Override
+  public void start() {
+    for(Sink s : sinkList) {
+      s.start();
+    }
+
+    state = LifecycleState.START;
+  }
+
+  @Override
+  public void stop() {
+    for(Sink s : sinkList) {
+      s.start();
+    }
+    state = LifecycleState.STOP;
+  }
+
+  @Override
+  public LifecycleState getLifecycleState() {
+    return state;
+  }
+
+  @Override
+  public void setSinks(List<Sink> sinks) {
+    List<Sink> list = new ArrayList<Sink>();
+    list.addAll(sinks);
+    sinkList = Collections.unmodifiableList(list);
+  }
+
+  protected List<Sink> getSinks() {
+    return sinkList;
+  }
+}

Added: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java?rev=1340222&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java Fri May 18 19:42:43 2012
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flume.Context;
+import org.apache.flume.Sink;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.sink.LoadBalancingSinkProcessor.SinkSelector;
+
+public abstract class AbstractSinkSelector implements SinkSelector {
+
+  private LifecycleState state;
+
+  // List of sinks as specified
+  private List<Sink> sinkList;
+
+  @Override
+  public void configure(Context context) {
+    // no-op configure method for convenience for implementations
+    // that do not require configuration.
+  }
+
+  @Override
+  public void start() {
+    state = LifecycleState.START;
+  }
+
+  @Override
+  public void stop() {
+    state = LifecycleState.STOP;
+  }
+
+  @Override
+  public LifecycleState getLifecycleState() {
+    return state;
+  }
+
+  @Override
+  public void setSinks(List<Sink> sinks) {
+    sinkList = new ArrayList<Sink>();
+    sinkList.addAll(sinks);
+  }
+
+  protected List<Sink> getSinks() {
+    return sinkList;
+  }
+}

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java?rev=1340222&r1=1340221&r2=1340222&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java Fri May 18 19:42:43 2012
@@ -29,9 +29,7 @@ import java.util.TreeMap;
 import org.apache.flume.Context;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Sink;
-import org.apache.flume.SinkProcessor;
 import org.apache.flume.Sink.Status;
-import org.apache.flume.lifecycle.LifecycleState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,7 +61,7 @@ import org.slf4j.LoggerFactory;
  * host1.sinkgroups.group1.processor.maxpenalty = 10000
  *
  */
-public class FailoverSinkProcessor implements SinkProcessor {
+public class FailoverSinkProcessor extends AbstractSinkProcessor {
   private static final int FAILURE_PENALTY = 1000;
   private static final int DEFAULT_MAX_PENALTY = 30000;
 
@@ -115,31 +113,9 @@ public class FailoverSinkProcessor imple
   private Sink activeSink;
   private SortedMap<Integer, Sink> liveSinks;
   private Queue<FailedSink> failedSinks;
-  private LifecycleState state;
   private int maxPenalty;
 
   @Override
-  public void start() {
-    for(Sink s : sinks.values()) {
-      s.start();
-    }
-    state = LifecycleState.START;
-  }
-
-  @Override
-  public void stop() {
-    for(Sink s : sinks.values()) {
-      s.stop();
-    }
-    state = LifecycleState.STOP;
-  }
-
-  @Override
-  public LifecycleState getLifecycleState() {
-    return state;
-  }
-
-  @Override
   public void configure(Context context) {
     liveSinks = new TreeMap<Integer, Sink>();
     failedSinks = new PriorityQueue<FailedSink>();
@@ -168,8 +144,8 @@ public class FailoverSinkProcessor imple
         liveSinks.put(priority, sinks.get(entry.getKey()));
       } else {
         logger.warn("Sink {} not added to FailverSinkProcessor as priority" +
-        		"duplicates that of sink {}", entry.getKey(),
-        		liveSinks.get(priority));
+            "duplicates that of sink {}", entry.getKey(),
+            liveSinks.get(priority));
       }
     }
     activeSink = liveSinks.get(liveSinks.lastKey());
@@ -230,6 +206,9 @@ public class FailoverSinkProcessor imple
 
   @Override
   public void setSinks(List<Sink> sinks) {
+    // needed to implement the start/stop functionality
+    super.setSinks(sinks);
+
     this.sinks = new HashMap<String, Sink>();
     for (Sink sink : sinks) {
       this.sinks.put(sink.getName(), sink);

Added: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java?rev=1340222&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java Fri May 18 19:42:43 2012
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.flume.Context;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink;
+import org.apache.flume.Sink.Status;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p>Provides the ability to load-balance flow over multiple sinks.</p>
+ *
+ * <p>The <tt>LoadBalancingSinkProcessor</tt> maintains an indexed list of
+ * active sinks on which the load must be distributed. This implementation
+ * supports distributing load using either via <tt>ROUND_ROBIN</tt> or via
+ * <tt>RANDOM</tt> selection mechanism. The choice of selection mechanism
+ * defaults to <tt>ROUND_ROBIN</tt> type, but can be overridden via
+ * configuration.</p>
+ *
+ * <p>When invoked, this selector picks the next sink using its configured
+ * selection mechanism and invokes it. In case the selected sink fails with
+ * an exception, the processor picks the next available sink via its configured
+ * selection mechanism. This implementation does not blacklist the failing
+ * sink and instead continues to optimistically attempt every available sink.
+ * If all sinks invocations result in failure, the selector propagates the
+ * failure to the sink runner.</p>
+ *
+ * <p>
+ * Sample configuration:
+ *  <pre>
+ *  host1.sinkgroups.group1.sinks = sink1 sink2
+ *  host1.sinkgroups.group1.processor.type = load_balance
+ *  host1.sinkgroups.group1.processor.selector = <selector type>
+ *  host1.sinkgroups.group1.processor.selector.selector_property = <value>
+ *  </pre>
+ *
+ * The value of processor.selector could be either <tt>round_robin</tt> for
+ * round-robin scheme of load-balancing or <tt>random</tt> for random
+ * selection. Alternatively you can specify your own implementation of the
+ * selection algorithm by implementing the <tt>LoadBalancingSelector</tt>
+ * interface. If no selector mechanism is specified, the round-robin selector
+ * is used by default.
+ * </p>
+ * <p>
+ * This implementation is not thread safe at this time
+ * </p>
+ *
+ * @see FailoverSinkProcessor
+ * @see LoadBalancingSinkProcessor.SinkSelector
+ */
+public class LoadBalancingSinkProcessor extends AbstractSinkProcessor {
+
+  public static final String CONFIG_SELECTOR = "selector";
+  public static final String CONFIG_SELECTOR_PREFIX = CONFIG_SELECTOR + ".";
+
+  public static final String SELECTOR_NAME_ROUND_ROBIN = "ROUND_ROBIN";
+  public static final String SELECTOR_NAME_RANDOM = "RANDOM";
+
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(LoadBalancingSinkProcessor.class);
+
+  private SinkSelector selector;
+
+  @Override
+  public void configure(Context context) {
+    Preconditions.checkState(getSinks().size() > 1,
+        "The LoadBalancingSinkProcessor cannot be used for a single sink. "
+        + "Please configure more than one sinks and try again.");
+
+    String selectorTypeName = context.getString(CONFIG_SELECTOR,
+        SELECTOR_NAME_ROUND_ROBIN);
+
+    selector = null;
+
+    if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) {
+      selector = new RoundRobinSinkSelector();
+    } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {
+      selector = new RandomOrderSinkSelector();
+    } else {
+      try {
+        @SuppressWarnings("unchecked")
+        Class<? extends SinkSelector> klass = (Class<? extends SinkSelector>)
+            Class.forName(selectorTypeName);
+
+        selector = klass.newInstance();
+      } catch (Exception ex) {
+        throw new FlumeException("Unable to instantiate sink selector: "
+            + selectorTypeName, ex);
+      }
+    }
+
+    selector.setSinks(getSinks());
+    selector.configure(
+        new Context(context.getSubProperties(CONFIG_SELECTOR_PREFIX)));
+
+    LOGGER.debug("Sink selector: " + selector + " initialized");
+  }
+
+  @Override
+  public void start() {
+    super.start();
+
+    selector.start();
+  }
+
+  @Override
+  public void stop() {
+    super.stop();
+
+    selector.stop();
+  }
+
+  @Override
+  public Status process() throws EventDeliveryException {
+    Status status = null;
+
+    Iterator<Sink> sinkIterator = selector.createSinkIterator();
+    while (sinkIterator.hasNext()) {
+      Sink sink = sinkIterator.next();
+      try {
+        status = sink.process();
+        break;
+      } catch (Exception ex) {
+        LOGGER.warn("Sink failed to consume event. "
+            + "Attempting next sink if available.", ex);
+      }
+    }
+
+    if (status == null) {
+      throw new EventDeliveryException("All configured sinks have failed");
+    }
+
+    return status;
+  }
+
+
+  /**
+   * <p>
+   * An interface that allows the LoadBalancingSinkProcessor to use
+   * a load-balancing strategy such as round-robin, random distribution etc.
+   * Implementations of this class can be plugged into the system via
+   * processor configuration and are used to select a sink on every invocation.
+   * </p>
+   * <p>
+   * An instance of the configured sink selector is create during the processor
+   * configuration, its {@linkplain #setSinks(List)} method is invoked following
+   * which it is configured via a subcontext. Once configured, the lifecycle of
+   * this selector is tied to the lifecycle of the sink processor.
+   * </p>
+   * <p>
+   * At runtime, the processor invokes the {@link #createSinkIterator()}
+   * method for every <tt>process</tt> call to create an iteration order over
+   * the available sinks. The processor then loops through this iteration order
+   * until one of the sinks succeeds in processing the event. If the iterator
+   * is exhausted and none of the sinks succeed, the processor will raise
+   * an <tt>EventDeliveryException</tt>.
+   * </p>
+   */
+  public interface SinkSelector extends Configurable, LifecycleAware {
+
+    void setSinks(List<Sink> sinks);
+
+    Iterator<Sink> createSinkIterator();
+  }
+
+  /**
+   * A sink selector that implements the round-robin sink selection policy.
+   * This implementation is not MT safe.
+   */
+  private static class RoundRobinSinkSelector extends AbstractSinkSelector {
+
+    private int nextHead = 0;
+
+    @Override
+    public Iterator<Sink> createSinkIterator() {
+
+      int size = getSinks().size();
+      int[] indexOrder = new int[size];
+
+      int begin = nextHead++;
+      if (nextHead == size) {
+        nextHead = 0;
+      }
+
+      for (int i=0; i < size; i++) {
+        indexOrder[i] = (begin + i)%size;
+      }
+
+      return new SpecificOrderSinkIterator(indexOrder, getSinks());
+    }
+  }
+
+  /**
+   * A sink selector that implements a random sink selection policy. This
+   * implementation is not thread safe.
+   */
+  private static class RandomOrderSinkSelector extends AbstractSinkSelector {
+
+    private Random random = new Random(System.currentTimeMillis());
+
+    @Override
+    public Iterator<Sink> createSinkIterator() {
+      int size = getSinks().size();
+      int[] indexOrder = new int[size];
+
+      List<Integer> indexList = new ArrayList<Integer>();
+      for (int i=0; i<size; i++) {
+        indexList.add(i);
+      }
+
+      while (indexList.size() != 1) {
+        int pick = random.nextInt(indexList.size());
+        indexOrder[indexList.size() - 1] = indexList.remove(pick);
+      }
+
+      indexOrder[0] = indexList.get(0);
+
+      return new SpecificOrderSinkIterator(indexOrder, getSinks());
+    }
+  }
+
+
+  /**
+   * A utility class that iterates over the given ordered list of Sinks via
+   * the specified order array. The entries of the order array indicate the
+   * index within the ordered list of Sinks that needs to be picked over the
+   * course of iteration.
+   */
+  private static class SpecificOrderSinkIterator implements Iterator<Sink> {
+
+    private final int[] order;
+    private final List<Sink> sinks;
+    private int index = 0;
+
+    SpecificOrderSinkIterator(int[] orderArray, List<Sink> sinkList) {
+      order = orderArray;
+      sinks = sinkList;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return index < order.length;
+    }
+
+    @Override
+    public Sink next() {
+      return sinks.get(order[index++]);
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+}

Added: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/FixedOrderSelector.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/FixedOrderSelector.java?rev=1340222&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/FixedOrderSelector.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/FixedOrderSelector.java Fri May 18 19:42:43 2012
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink;
+
+import java.util.Iterator;
+
+import org.apache.flume.Context;
+import org.apache.flume.Sink;
+
+/**
+ * A test selector that always returns the iteration order of specified
+ * sinks for testing purposes. This selector expects that the configuration
+ * key {@value #SET_ME} is specified with a non-null value.
+ */
+public class FixedOrderSelector extends AbstractSinkSelector {
+
+  public static final String SET_ME = "setme";
+
+  @Override
+  public Iterator<Sink> createSinkIterator() {
+    return getSinks().iterator();
+  }
+
+  @Override
+  public void configure(Context context) {
+    super.configure(context);
+
+    if (context.getString(SET_ME) == null) {
+      throw new RuntimeException("config key " + SET_ME + " not specified");
+    }
+  }
+}

Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/SinkProcessorFactoryTest.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/SinkProcessorFactoryTest.java?rev=1340222&r1=1340221&r2=1340222&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/SinkProcessorFactoryTest.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/SinkProcessorFactoryTest.java Fri May 18 19:42:43 2012
@@ -46,4 +46,19 @@ public class SinkProcessorFactoryTest {
     Assert.assertEquals(sp.getClass(), sp2.getClass());
   }
 
+  @Test
+  public void testInstantiatingLoadBalancingSinkProcessor() {
+    Context context = new Context();
+    context.put("type", LoadBalancingSinkProcessor.class.getName());
+    context.put("selector", "random");
+    SinkFactory sf = new DefaultSinkFactory();
+    List<Sink> sinks = new ArrayList<Sink>();
+    sinks.add(sf.create("sink1", "avro"));
+    sinks.add(sf.create("sink2", "avro"));
+    SinkProcessor sp = SinkProcessorFactory.getProcessor(context, sinks);
+    context.put("type", "load_balance");
+    SinkProcessor sp2 = SinkProcessorFactory.getProcessor(context, sinks);
+    Assert.assertEquals(sp.getClass(), sp2.getClass());
+  }
+
 }

Added: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java?rev=1340222&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java Fri May 18 19:42:43 2012
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink;
+import org.apache.flume.Sink.Status;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.AbstractChannel;
+import org.junit.Test;
+
+public class TestLoadBalancingSinkProcessor {
+
+  private Context getContext(String selectorType) {
+    Map<String, String> p = new HashMap<String, String>();
+    p.put("selector", selectorType);
+    Context ctx = new Context(p);
+
+    return ctx;
+  }
+
+  private LoadBalancingSinkProcessor getProcessor(
+      String selectorType, List<Sink> sinks) {
+    return getProcessor(sinks, getContext(selectorType));
+  }
+
+  private LoadBalancingSinkProcessor getProcessor(List<Sink> sinks, Context ctx)
+  {
+    LoadBalancingSinkProcessor lbsp = new LoadBalancingSinkProcessor();
+    lbsp.setSinks(sinks);
+    lbsp.configure(ctx);
+    lbsp.start();
+
+    return lbsp;
+  }
+
+  @Test
+  public void testDefaultConfiguration() throws Exception {
+    // If no selector is specified, the round-robin selector should be used
+    Channel ch = new MockChannel();
+    int n = 100;
+    int numEvents = 3*n;
+    for (int i = 0; i < numEvents; i++) {
+      ch.put(new MockEvent("test" + i));
+    }
+
+    MockSink s1 = new MockSink(1);
+    s1.setChannel(ch);
+
+    MockSink s2 = new MockSink(2);
+    s2.setChannel(ch);
+
+    MockSink s3 = new MockSink(3);
+    s3.setChannel(ch);
+
+    List<Sink> sinks = new ArrayList<Sink>();
+    sinks.add(s1);
+    sinks.add(s2);
+    sinks.add(s3);
+
+    LoadBalancingSinkProcessor lbsp = getProcessor(sinks, new Context());
+
+    Status s = Status.READY;
+    while (s != Status.BACKOFF) {
+      s = lbsp.process();
+    }
+
+    Assert.assertTrue(s1.getEvents().size() == n);
+    Assert.assertTrue(s2.getEvents().size() == n);
+    Assert.assertTrue(s3.getEvents().size() == n);
+
+  }
+
+  @Test
+  public void testRandomOneActiveSink() throws Exception {
+    Channel ch = new MockChannel();
+    int n = 10;
+    int numEvents = n;
+    for (int i = 0; i < numEvents; i++) {
+      ch.put(new MockEvent("test" + i));
+    }
+
+    MockSink s1 = new MockSink(1);
+    s1.setChannel(ch);
+
+    // s1 always fails
+    s1.setFail();
+
+    MockSink s2 = new MockSink(2);
+    s2.setChannel(ch);
+
+
+    MockSink s3 = new MockSink(3);
+    s3.setChannel(ch);
+
+    // s3 always fails
+    s3.setFail();
+
+    List<Sink> sinks = new ArrayList<Sink>();
+    sinks.add(s1);
+    sinks.add(s2);
+    sinks.add(s3);
+
+    LoadBalancingSinkProcessor lbsp = getProcessor("random", sinks);
+
+    Sink.Status s = Sink.Status.READY;
+    while (s != Sink.Status.BACKOFF) {
+      s = lbsp.process();
+    }
+
+    Assert.assertTrue(s1.getEvents().size() == 0);
+    Assert.assertTrue(s2.getEvents().size() == n);
+    Assert.assertTrue(s3.getEvents().size() == 0);
+  }
+
+  @Test
+  public void testRandomPersistentFailure() throws Exception {
+    Channel ch = new MockChannel();
+    int n = 100;
+    int numEvents = 3*n;
+    for (int i = 0; i < numEvents; i++) {
+      ch.put(new MockEvent("test" + i));
+    }
+
+    MockSink s1 = new MockSink(1);
+    s1.setChannel(ch);
+
+    MockSink s2 = new MockSink(2);
+    s2.setChannel(ch);
+
+    // s2 always fails
+    s2.setFail();
+
+    MockSink s3 = new MockSink(3);
+    s3.setChannel(ch);
+
+    List<Sink> sinks = new ArrayList<Sink>();
+    sinks.add(s1);
+    sinks.add(s2);
+    sinks.add(s3);
+
+    LoadBalancingSinkProcessor lbsp = getProcessor("random",sinks);
+
+    Status s = Status.READY;
+    while (s != Status.BACKOFF) {
+      s = lbsp.process();
+    }
+
+    Assert.assertTrue(s2.getEvents().size() == 0);
+    Assert.assertTrue(s1.getEvents().size() + s3.getEvents().size() == 3*n);
+  }
+
+  @Test
+  public void testRandomNoFailure() throws Exception {
+
+    Channel ch = new MockChannel();
+    int n = 10000;
+    int numEvents = n;
+    for (int i = 0; i < numEvents; i++) {
+      ch.put(new MockEvent("test" + i));
+    }
+
+    MockSink s1 = new MockSink(1);
+    s1.setChannel(ch);
+
+    MockSink s2 = new MockSink(2);
+    s2.setChannel(ch);
+
+    MockSink s3 = new MockSink(3);
+    s3.setChannel(ch);
+
+    MockSink s4 = new MockSink(4);
+    s4.setChannel(ch);
+
+    MockSink s5 = new MockSink(5);
+    s5.setChannel(ch);
+
+    MockSink s6 = new MockSink(6);
+    s6.setChannel(ch);
+
+    MockSink s7 = new MockSink(7);
+    s7.setChannel(ch);
+
+    MockSink s8 = new MockSink(8);
+    s8.setChannel(ch);
+
+    MockSink s9 = new MockSink(9);
+    s9.setChannel(ch);
+
+    MockSink s0 = new MockSink(0);
+    s0.setChannel(ch);
+
+    List<Sink> sinks = new ArrayList<Sink>();
+    sinks.add(s1);
+    sinks.add(s2);
+    sinks.add(s3);
+    sinks.add(s4);
+    sinks.add(s5);
+    sinks.add(s6);
+    sinks.add(s7);
+    sinks.add(s8);
+    sinks.add(s9);
+    sinks.add(s0);
+
+    LoadBalancingSinkProcessor lbsp = getProcessor("random",sinks);
+
+    Status s = Status.READY;
+    while (s != Status.BACKOFF) {
+      s = lbsp.process();
+    }
+
+    Set<Integer> sizeSet = new HashSet<Integer>();
+    int sum = 0;
+    for (Sink ms : sinks) {
+      int count = ((MockSink) ms).getEvents().size();
+      sum += count;
+      sizeSet.add(count);
+    }
+
+    // Assert that all the events were accounted for
+    Assert.assertEquals(n, sum);
+
+    // Assert that at least two sinks came with different event sizes.
+    // This makes sense if the total number of events is evenly divisible by
+    // the total number of sinks. In which case the round-robin policy will
+    // end up causing all sinks to get the same number of events where as
+    // the random policy will have very low probability of doing that.
+    Assert.assertTrue("Miraculous distribution", sizeSet.size() > 1);
+  }
+
+
+
+  @Test
+  public void testRoundRobinOneActiveSink() throws Exception {
+    Channel ch = new MockChannel();
+    int n = 10;
+    int numEvents = n;
+    for (int i = 0; i < numEvents; i++) {
+      ch.put(new MockEvent("test" + i));
+    }
+
+    MockSink s1 = new MockSink(1);
+    s1.setChannel(ch);
+
+    // s1 always fails
+    s1.setFail();
+
+    MockSink s2 = new MockSink(2);
+    s2.setChannel(ch);
+
+
+    MockSink s3 = new MockSink(3);
+    s3.setChannel(ch);
+
+    // s3 always fails
+    s3.setFail();
+
+    List<Sink> sinks = new ArrayList<Sink>();
+    sinks.add(s1);
+    sinks.add(s2);
+    sinks.add(s3);
+
+    LoadBalancingSinkProcessor lbsp = getProcessor("round_robin", sinks);
+
+    Sink.Status s = Sink.Status.READY;
+    while (s != Sink.Status.BACKOFF) {
+      s = lbsp.process();
+    }
+
+    Assert.assertTrue(s1.getEvents().size() == 0);
+    Assert.assertTrue(s2.getEvents().size() == n);
+    Assert.assertTrue(s3.getEvents().size() == 0);
+  }
+
+  @Test
+  public void testRoundRobinPersistentFailure() throws Exception {
+    Channel ch = new MockChannel();
+    int n = 100;
+    int numEvents = 3*n;
+    for (int i = 0; i < numEvents; i++) {
+      ch.put(new MockEvent("test" + i));
+    }
+
+    MockSink s1 = new MockSink(1);
+    s1.setChannel(ch);
+
+    MockSink s2 = new MockSink(2);
+    s2.setChannel(ch);
+
+    // s2 always fails
+    s2.setFail();
+
+    MockSink s3 = new MockSink(3);
+    s3.setChannel(ch);
+
+    List<Sink> sinks = new ArrayList<Sink>();
+    sinks.add(s1);
+    sinks.add(s2);
+    sinks.add(s3);
+
+    LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks);
+
+    Status s = Status.READY;
+    while (s != Status.BACKOFF) {
+      s = lbsp.process();
+    }
+
+    Assert.assertTrue(s1.getEvents().size() == n);
+    Assert.assertTrue(s2.getEvents().size() == 0);
+    Assert.assertTrue(s3.getEvents().size() == 2*n);
+  }
+
+  @Test
+  public void testRoundRobinNoFailure() throws Exception {
+
+    Channel ch = new MockChannel();
+    int n = 100;
+    int numEvents = 3*n;
+    for (int i = 0; i < numEvents; i++) {
+      ch.put(new MockEvent("test" + i));
+    }
+
+    MockSink s1 = new MockSink(1);
+    s1.setChannel(ch);
+
+    MockSink s2 = new MockSink(2);
+    s2.setChannel(ch);
+
+    MockSink s3 = new MockSink(3);
+    s3.setChannel(ch);
+
+    List<Sink> sinks = new ArrayList<Sink>();
+    sinks.add(s1);
+    sinks.add(s2);
+    sinks.add(s3);
+
+    LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks);
+
+    Status s = Status.READY;
+    while (s != Status.BACKOFF) {
+      s = lbsp.process();
+    }
+
+    Assert.assertTrue(s1.getEvents().size() == n);
+    Assert.assertTrue(s2.getEvents().size() == n);
+    Assert.assertTrue(s3.getEvents().size() == n);
+  }
+
+  @Test
+  public void testCustomSelector() throws Exception {
+    Channel ch = new MockChannel();
+    int n = 10;
+    int numEvents = n;
+    for (int i = 0; i < numEvents; i++) {
+      ch.put(new MockEvent("test" + i));
+    }
+
+    MockSink s1 = new MockSink(1);
+    s1.setChannel(ch);
+
+    // s1 always fails
+    s1.setFail();
+
+    MockSink s2 = new MockSink(2);
+    s2.setChannel(ch);
+
+    MockSink s3 = new MockSink(3);
+    s3.setChannel(ch);
+
+    List<Sink> sinks = new ArrayList<Sink>();
+    sinks.add(s1);
+    sinks.add(s2);
+    sinks.add(s3);
+
+    // This selector will result in all events going to s2
+    Context ctx = getContext(FixedOrderSelector.class.getCanonicalName());
+    ctx.put("selector." + FixedOrderSelector.SET_ME, "foo");
+    LoadBalancingSinkProcessor lbsp = getProcessor(sinks, ctx);
+
+    Sink.Status s = Sink.Status.READY;
+    while (s != Sink.Status.BACKOFF) {
+      s = lbsp.process();
+    }
+
+    Assert.assertTrue(s1.getEvents().size() == 0);
+    Assert.assertTrue(s2.getEvents().size() == n);
+    Assert.assertTrue(s3.getEvents().size() == 0);
+  }
+
+  private static class MockSink extends AbstractSink {
+
+    private final int id;
+
+    private List<Event> events = new ArrayList();
+
+    private boolean fail = false;
+
+    private MockSink(int id) {
+      this.id = id;
+    }
+
+    List<Event> getEvents() {
+      return events;
+    }
+
+    int getId() {
+      return id;
+    }
+
+    void setFail() {
+      fail = true;
+    }
+
+    @Override
+    public Status process() throws EventDeliveryException {
+      if (fail) {
+        throw new EventDeliveryException("failed");
+      }
+      Event e = this.getChannel().take();
+      if (e == null)
+        return Status.BACKOFF;
+
+      events.add(e);
+      return Status.READY;
+    }
+  }
+
+  private static class MockChannel extends AbstractChannel {
+
+    private List<Event> events = new ArrayList<Event>();
+
+    @Override
+    public void put(Event event) throws ChannelException {
+      events.add(event);
+    }
+
+    @Override
+    public Event take() throws ChannelException {
+      if (events.size() > 0) {
+        return events.remove(0);
+      }
+      return null;
+    }
+
+    @Override
+    public Transaction getTransaction() {
+      return null;
+    }
+
+  }
+
+  private static class MockEvent implements Event {
+
+    private static final Map<String, String> EMPTY_HEADERS =
+        Collections.unmodifiableMap(new HashMap<String, String>());
+
+    private byte[] body;
+
+    MockEvent(String str) {
+      this.body = str.getBytes();
+    }
+
+    @Override
+    public Map<String, String> getHeaders() {
+      return EMPTY_HEADERS;
+    }
+
+    @Override
+    public void setHeaders(Map<String, String> headers) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public byte[] getBody() {
+      return body;
+    }
+
+    @Override
+    public void setBody(byte[] body) {
+      this.body = body;
+    }
+  }
+}