You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2012/05/18 21:42:43 UTC
svn commit: r1340222 - in /incubator/flume/trunk:
flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/
flume-ng-core/src/main/java/org/apache/flume/sink/
flume-ng-core/src/test/java/org/apache/flume/sink/
Author: brock
Date: Fri May 18 19:42:43 2012
New Revision: 1340222
URL: http://svn.apache.org/viewvc?rev=1340222&view=rev
Log:
FLUME-1198: Implement a load balancing sink processor
(Arvind Prabhakar via Brock Noland)
Added:
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/FixedOrderSelector.java
incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
Modified:
incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java
incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/SinkProcessorFactoryTest.java
Modified: incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java?rev=1340222&r1=1340221&r2=1340222&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java (original)
+++ incubator/flume/trunk/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkProcessorType.java Fri May 18 19:42:43 2012
@@ -27,16 +27,25 @@ public enum SinkProcessorType {
/**
* Failover processor
*
- * @see FailoverSinkProcessor
+ * @see org.apache.flume.sink.FailoverSinkProcessor
*/
FAILOVER("org.apache.flume.sink.FailoverSinkProcessor"),
/**
* Standard processor
*
- * @see DefaultSinkProcessor
+ * @see org.apache.flume.sink.DefaultSinkProcessor
*/
- DEFAULT("org.apache.flume.sink.DefaultSinkProcessor");
+ DEFAULT("org.apache.flume.sink.DefaultSinkProcessor"),
+
+
+ /**
+ * Load balancing processor
+ *
+ * @see org.apache.flume.sink.LoadBalancingSinkProcessor
+ */
+ LOAD_BALANCE("org.apache.flume.sink.LoadBalancingSinkProcessor");
+
private final String processorClassName;
private SinkProcessorType(String processorClassName) {
Added: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java?rev=1340222&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkProcessor.java Fri May 18 19:42:43 2012
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flume.Sink;
+import org.apache.flume.SinkProcessor;
+import org.apache.flume.lifecycle.LifecycleState;
+
+/**
+ * A convenience base class for sink processors.
+ */
+public abstract class AbstractSinkProcessor implements SinkProcessor {
+
+ private LifecycleState state;
+
+ // List of sinks as specified
+ private List<Sink> sinkList;
+
+ @Override
+ public void start() {
+ for(Sink s : sinkList) {
+ s.start();
+ }
+
+ state = LifecycleState.START;
+ }
+
+ @Override
+ public void stop() {
+ for(Sink s : sinkList) {
+ s.start();
+ }
+ state = LifecycleState.STOP;
+ }
+
+ @Override
+ public LifecycleState getLifecycleState() {
+ return state;
+ }
+
+ @Override
+ public void setSinks(List<Sink> sinks) {
+ List<Sink> list = new ArrayList<Sink>();
+ list.addAll(sinks);
+ sinkList = Collections.unmodifiableList(list);
+ }
+
+ protected List<Sink> getSinks() {
+ return sinkList;
+ }
+}
Added: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java?rev=1340222&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java Fri May 18 19:42:43 2012
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flume.Context;
+import org.apache.flume.Sink;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.sink.LoadBalancingSinkProcessor.SinkSelector;
+
+public abstract class AbstractSinkSelector implements SinkSelector {
+
+ private LifecycleState state;
+
+ // List of sinks as specified
+ private List<Sink> sinkList;
+
+ @Override
+ public void configure(Context context) {
+ // no-op configure method for convenience for implementations
+ // that do not require configuration.
+ }
+
+ @Override
+ public void start() {
+ state = LifecycleState.START;
+ }
+
+ @Override
+ public void stop() {
+ state = LifecycleState.STOP;
+ }
+
+ @Override
+ public LifecycleState getLifecycleState() {
+ return state;
+ }
+
+ @Override
+ public void setSinks(List<Sink> sinks) {
+ sinkList = new ArrayList<Sink>();
+ sinkList.addAll(sinks);
+ }
+
+ protected List<Sink> getSinks() {
+ return sinkList;
+ }
+}
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java?rev=1340222&r1=1340221&r2=1340222&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/FailoverSinkProcessor.java Fri May 18 19:42:43 2012
@@ -29,9 +29,7 @@ import java.util.TreeMap;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
-import org.apache.flume.SinkProcessor;
import org.apache.flume.Sink.Status;
-import org.apache.flume.lifecycle.LifecycleState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +61,7 @@ import org.slf4j.LoggerFactory;
* host1.sinkgroups.group1.processor.maxpenalty = 10000
*
*/
-public class FailoverSinkProcessor implements SinkProcessor {
+public class FailoverSinkProcessor extends AbstractSinkProcessor {
private static final int FAILURE_PENALTY = 1000;
private static final int DEFAULT_MAX_PENALTY = 30000;
@@ -115,31 +113,9 @@ public class FailoverSinkProcessor imple
private Sink activeSink;
private SortedMap<Integer, Sink> liveSinks;
private Queue<FailedSink> failedSinks;
- private LifecycleState state;
private int maxPenalty;
@Override
- public void start() {
- for(Sink s : sinks.values()) {
- s.start();
- }
- state = LifecycleState.START;
- }
-
- @Override
- public void stop() {
- for(Sink s : sinks.values()) {
- s.stop();
- }
- state = LifecycleState.STOP;
- }
-
- @Override
- public LifecycleState getLifecycleState() {
- return state;
- }
-
- @Override
public void configure(Context context) {
liveSinks = new TreeMap<Integer, Sink>();
failedSinks = new PriorityQueue<FailedSink>();
@@ -168,8 +144,8 @@ public class FailoverSinkProcessor imple
liveSinks.put(priority, sinks.get(entry.getKey()));
} else {
logger.warn("Sink {} not added to FailverSinkProcessor as priority" +
- "duplicates that of sink {}", entry.getKey(),
- liveSinks.get(priority));
+ "duplicates that of sink {}", entry.getKey(),
+ liveSinks.get(priority));
}
}
activeSink = liveSinks.get(liveSinks.lastKey());
@@ -230,6 +206,9 @@ public class FailoverSinkProcessor imple
@Override
public void setSinks(List<Sink> sinks) {
+ // needed to implement the start/stop functionality
+ super.setSinks(sinks);
+
this.sinks = new HashMap<String, Sink>();
for (Sink sink : sinks) {
this.sinks.put(sink.getName(), sink);
Added: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java?rev=1340222&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java Fri May 18 19:42:43 2012
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.flume.Context;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink;
+import org.apache.flume.Sink.Status;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p>Provides the ability to load-balance flow over multiple sinks.</p>
+ *
+ * <p>The <tt>LoadBalancingSinkProcessor</tt> maintains an indexed list of
+ * active sinks on which the load must be distributed. This implementation
+ * supports distributing load using either via <tt>ROUND_ROBIN</tt> or via
+ * <tt>RANDOM</tt> selection mechanism. The choice of selection mechanism
+ * defaults to <tt>ROUND_ROBIN</tt> type, but can be overridden via
+ * configuration.</p>
+ *
+ * <p>When invoked, this selector picks the next sink using its configured
+ * selection mechanism and invokes it. In case the selected sink fails with
+ * an exception, the processor picks the next available sink via its configured
+ * selection mechanism. This implementation does not blacklist the failing
+ * sink and instead continues to optimistically attempt every available sink.
+ * If all sinks invocations result in failure, the selector propagates the
+ * failure to the sink runner.</p>
+ *
+ * <p>
+ * Sample configuration:
+ * <pre>
+ * host1.sinkgroups.group1.sinks = sink1 sink2
+ * host1.sinkgroups.group1.processor.type = load_balance
+ * host1.sinkgroups.group1.processor.selector = <selector type>
+ * host1.sinkgroups.group1.processor.selector.selector_property = <value>
+ * </pre>
+ *
+ * The value of processor.selector could be either <tt>round_robin</tt> for
+ * round-robin scheme of load-balancing or <tt>random</tt> for random
+ * selection. Alternatively you can specify your own implementation of the
+ * selection algorithm by implementing the <tt>LoadBalancingSelector</tt>
+ * interface. If no selector mechanism is specified, the round-robin selector
+ * is used by default.
+ * </p>
+ * <p>
+ * This implementation is not thread safe at this time
+ * </p>
+ *
+ * @see FailoverSinkProcessor
+ * @see LoadBalancingSinkProcessor.SinkSelector
+ */
+public class LoadBalancingSinkProcessor extends AbstractSinkProcessor {
+
+ public static final String CONFIG_SELECTOR = "selector";
+ public static final String CONFIG_SELECTOR_PREFIX = CONFIG_SELECTOR + ".";
+
+ public static final String SELECTOR_NAME_ROUND_ROBIN = "ROUND_ROBIN";
+ public static final String SELECTOR_NAME_RANDOM = "RANDOM";
+
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(LoadBalancingSinkProcessor.class);
+
+ private SinkSelector selector;
+
+ @Override
+ public void configure(Context context) {
+ Preconditions.checkState(getSinks().size() > 1,
+ "The LoadBalancingSinkProcessor cannot be used for a single sink. "
+ + "Please configure more than one sinks and try again.");
+
+ String selectorTypeName = context.getString(CONFIG_SELECTOR,
+ SELECTOR_NAME_ROUND_ROBIN);
+
+ selector = null;
+
+ if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) {
+ selector = new RoundRobinSinkSelector();
+ } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {
+ selector = new RandomOrderSinkSelector();
+ } else {
+ try {
+ @SuppressWarnings("unchecked")
+ Class<? extends SinkSelector> klass = (Class<? extends SinkSelector>)
+ Class.forName(selectorTypeName);
+
+ selector = klass.newInstance();
+ } catch (Exception ex) {
+ throw new FlumeException("Unable to instantiate sink selector: "
+ + selectorTypeName, ex);
+ }
+ }
+
+ selector.setSinks(getSinks());
+ selector.configure(
+ new Context(context.getSubProperties(CONFIG_SELECTOR_PREFIX)));
+
+ LOGGER.debug("Sink selector: " + selector + " initialized");
+ }
+
+ @Override
+ public void start() {
+ super.start();
+
+ selector.start();
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+
+ selector.stop();
+ }
+
+ @Override
+ public Status process() throws EventDeliveryException {
+ Status status = null;
+
+ Iterator<Sink> sinkIterator = selector.createSinkIterator();
+ while (sinkIterator.hasNext()) {
+ Sink sink = sinkIterator.next();
+ try {
+ status = sink.process();
+ break;
+ } catch (Exception ex) {
+ LOGGER.warn("Sink failed to consume event. "
+ + "Attempting next sink if available.", ex);
+ }
+ }
+
+ if (status == null) {
+ throw new EventDeliveryException("All configured sinks have failed");
+ }
+
+ return status;
+ }
+
+
+ /**
+ * <p>
+ * An interface that allows the LoadBalancingSinkProcessor to use
+ * a load-balancing strategy such as round-robin, random distribution etc.
+ * Implementations of this class can be plugged into the system via
+ * processor configuration and are used to select a sink on every invocation.
+ * </p>
+ * <p>
+ * An instance of the configured sink selector is create during the processor
+ * configuration, its {@linkplain #setSinks(List)} method is invoked following
+ * which it is configured via a subcontext. Once configured, the lifecycle of
+ * this selector is tied to the lifecycle of the sink processor.
+ * </p>
+ * <p>
+ * At runtime, the processor invokes the {@link #createSinkIterator()}
+ * method for every <tt>process</tt> call to create an iteration order over
+ * the available sinks. The processor then loops through this iteration order
+ * until one of the sinks succeeds in processing the event. If the iterator
+ * is exhausted and none of the sinks succeed, the processor will raise
+ * an <tt>EventDeliveryException</tt>.
+ * </p>
+ */
+ public interface SinkSelector extends Configurable, LifecycleAware {
+
+ void setSinks(List<Sink> sinks);
+
+ Iterator<Sink> createSinkIterator();
+ }
+
+ /**
+ * A sink selector that implements the round-robin sink selection policy.
+ * This implementation is not MT safe.
+ */
+ private static class RoundRobinSinkSelector extends AbstractSinkSelector {
+
+ private int nextHead = 0;
+
+ @Override
+ public Iterator<Sink> createSinkIterator() {
+
+ int size = getSinks().size();
+ int[] indexOrder = new int[size];
+
+ int begin = nextHead++;
+ if (nextHead == size) {
+ nextHead = 0;
+ }
+
+ for (int i=0; i < size; i++) {
+ indexOrder[i] = (begin + i)%size;
+ }
+
+ return new SpecificOrderSinkIterator(indexOrder, getSinks());
+ }
+ }
+
+ /**
+ * A sink selector that implements a random sink selection policy. This
+ * implementation is not thread safe.
+ */
+ private static class RandomOrderSinkSelector extends AbstractSinkSelector {
+
+ private Random random = new Random(System.currentTimeMillis());
+
+ @Override
+ public Iterator<Sink> createSinkIterator() {
+ int size = getSinks().size();
+ int[] indexOrder = new int[size];
+
+ List<Integer> indexList = new ArrayList<Integer>();
+ for (int i=0; i<size; i++) {
+ indexList.add(i);
+ }
+
+ while (indexList.size() != 1) {
+ int pick = random.nextInt(indexList.size());
+ indexOrder[indexList.size() - 1] = indexList.remove(pick);
+ }
+
+ indexOrder[0] = indexList.get(0);
+
+ return new SpecificOrderSinkIterator(indexOrder, getSinks());
+ }
+ }
+
+
+ /**
+ * A utility class that iterates over the given ordered list of Sinks via
+ * the specified order array. The entries of the order array indicate the
+ * index within the ordered list of Sinks that needs to be picked over the
+ * course of iteration.
+ */
+ private static class SpecificOrderSinkIterator implements Iterator<Sink> {
+
+ private final int[] order;
+ private final List<Sink> sinks;
+ private int index = 0;
+
+ SpecificOrderSinkIterator(int[] orderArray, List<Sink> sinkList) {
+ order = orderArray;
+ sinks = sinkList;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < order.length;
+ }
+
+ @Override
+ public Sink next() {
+ return sinks.get(order[index++]);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
Added: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/FixedOrderSelector.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/FixedOrderSelector.java?rev=1340222&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/FixedOrderSelector.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/FixedOrderSelector.java Fri May 18 19:42:43 2012
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink;
+
+import java.util.Iterator;
+
+import org.apache.flume.Context;
+import org.apache.flume.Sink;
+
+/**
+ * A test selector that always returns the iteration order of specified
+ * sinks for testing purposes. This selector expects that the configuration
+ * key {@value #SET_ME} is specified with a non-null value.
+ */
+public class FixedOrderSelector extends AbstractSinkSelector {
+
+ public static final String SET_ME = "setme";
+
+ @Override
+ public Iterator<Sink> createSinkIterator() {
+ return getSinks().iterator();
+ }
+
+ @Override
+ public void configure(Context context) {
+ super.configure(context);
+
+ if (context.getString(SET_ME) == null) {
+ throw new RuntimeException("config key " + SET_ME + " not specified");
+ }
+ }
+}
Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/SinkProcessorFactoryTest.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/SinkProcessorFactoryTest.java?rev=1340222&r1=1340221&r2=1340222&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/SinkProcessorFactoryTest.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/SinkProcessorFactoryTest.java Fri May 18 19:42:43 2012
@@ -46,4 +46,19 @@ public class SinkProcessorFactoryTest {
Assert.assertEquals(sp.getClass(), sp2.getClass());
}
+ @Test
+ public void testInstantiatingLoadBalancingSinkProcessor() {
+ Context context = new Context();
+ context.put("type", LoadBalancingSinkProcessor.class.getName());
+ context.put("selector", "random");
+ SinkFactory sf = new DefaultSinkFactory();
+ List<Sink> sinks = new ArrayList<Sink>();
+ sinks.add(sf.create("sink1", "avro"));
+ sinks.add(sf.create("sink2", "avro"));
+ SinkProcessor sp = SinkProcessorFactory.getProcessor(context, sinks);
+ context.put("type", "load_balance");
+ SinkProcessor sp2 = SinkProcessorFactory.getProcessor(context, sinks);
+ Assert.assertEquals(sp.getClass(), sp2.getClass());
+ }
+
}
Added: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java?rev=1340222&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java Fri May 18 19:42:43 2012
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink;
+import org.apache.flume.Sink.Status;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.AbstractChannel;
+import org.junit.Test;
+
+public class TestLoadBalancingSinkProcessor {
+
+ private Context getContext(String selectorType) {
+ Map<String, String> p = new HashMap<String, String>();
+ p.put("selector", selectorType);
+ Context ctx = new Context(p);
+
+ return ctx;
+ }
+
+ private LoadBalancingSinkProcessor getProcessor(
+ String selectorType, List<Sink> sinks) {
+ return getProcessor(sinks, getContext(selectorType));
+ }
+
+ private LoadBalancingSinkProcessor getProcessor(List<Sink> sinks, Context ctx)
+ {
+ LoadBalancingSinkProcessor lbsp = new LoadBalancingSinkProcessor();
+ lbsp.setSinks(sinks);
+ lbsp.configure(ctx);
+ lbsp.start();
+
+ return lbsp;
+ }
+
+ @Test
+ public void testDefaultConfiguration() throws Exception {
+ // If no selector is specified, the round-robin selector should be used
+ Channel ch = new MockChannel();
+ int n = 100;
+ int numEvents = 3*n;
+ for (int i = 0; i < numEvents; i++) {
+ ch.put(new MockEvent("test" + i));
+ }
+
+ MockSink s1 = new MockSink(1);
+ s1.setChannel(ch);
+
+ MockSink s2 = new MockSink(2);
+ s2.setChannel(ch);
+
+ MockSink s3 = new MockSink(3);
+ s3.setChannel(ch);
+
+ List<Sink> sinks = new ArrayList<Sink>();
+ sinks.add(s1);
+ sinks.add(s2);
+ sinks.add(s3);
+
+ LoadBalancingSinkProcessor lbsp = getProcessor(sinks, new Context());
+
+ Status s = Status.READY;
+ while (s != Status.BACKOFF) {
+ s = lbsp.process();
+ }
+
+ Assert.assertTrue(s1.getEvents().size() == n);
+ Assert.assertTrue(s2.getEvents().size() == n);
+ Assert.assertTrue(s3.getEvents().size() == n);
+
+ }
+
+ @Test
+ public void testRandomOneActiveSink() throws Exception {
+ Channel ch = new MockChannel();
+ int n = 10;
+ int numEvents = n;
+ for (int i = 0; i < numEvents; i++) {
+ ch.put(new MockEvent("test" + i));
+ }
+
+ MockSink s1 = new MockSink(1);
+ s1.setChannel(ch);
+
+ // s1 always fails
+ s1.setFail();
+
+ MockSink s2 = new MockSink(2);
+ s2.setChannel(ch);
+
+
+ MockSink s3 = new MockSink(3);
+ s3.setChannel(ch);
+
+ // s3 always fails
+ s3.setFail();
+
+ List<Sink> sinks = new ArrayList<Sink>();
+ sinks.add(s1);
+ sinks.add(s2);
+ sinks.add(s3);
+
+ LoadBalancingSinkProcessor lbsp = getProcessor("random", sinks);
+
+ Sink.Status s = Sink.Status.READY;
+ while (s != Sink.Status.BACKOFF) {
+ s = lbsp.process();
+ }
+
+ Assert.assertTrue(s1.getEvents().size() == 0);
+ Assert.assertTrue(s2.getEvents().size() == n);
+ Assert.assertTrue(s3.getEvents().size() == 0);
+ }
+
+ @Test
+ public void testRandomPersistentFailure() throws Exception {
+ Channel ch = new MockChannel();
+ int n = 100;
+ int numEvents = 3*n;
+ for (int i = 0; i < numEvents; i++) {
+ ch.put(new MockEvent("test" + i));
+ }
+
+ MockSink s1 = new MockSink(1);
+ s1.setChannel(ch);
+
+ MockSink s2 = new MockSink(2);
+ s2.setChannel(ch);
+
+ // s2 always fails
+ s2.setFail();
+
+ MockSink s3 = new MockSink(3);
+ s3.setChannel(ch);
+
+ List<Sink> sinks = new ArrayList<Sink>();
+ sinks.add(s1);
+ sinks.add(s2);
+ sinks.add(s3);
+
+ LoadBalancingSinkProcessor lbsp = getProcessor("random",sinks);
+
+ Status s = Status.READY;
+ while (s != Status.BACKOFF) {
+ s = lbsp.process();
+ }
+
+ Assert.assertTrue(s2.getEvents().size() == 0);
+ Assert.assertTrue(s1.getEvents().size() + s3.getEvents().size() == 3*n);
+ }
+
+ @Test
+ public void testRandomNoFailure() throws Exception {
+
+ Channel ch = new MockChannel();
+ int n = 10000;
+ int numEvents = n;
+ for (int i = 0; i < numEvents; i++) {
+ ch.put(new MockEvent("test" + i));
+ }
+
+ MockSink s1 = new MockSink(1);
+ s1.setChannel(ch);
+
+ MockSink s2 = new MockSink(2);
+ s2.setChannel(ch);
+
+ MockSink s3 = new MockSink(3);
+ s3.setChannel(ch);
+
+ MockSink s4 = new MockSink(4);
+ s4.setChannel(ch);
+
+ MockSink s5 = new MockSink(5);
+ s5.setChannel(ch);
+
+ MockSink s6 = new MockSink(6);
+ s6.setChannel(ch);
+
+ MockSink s7 = new MockSink(7);
+ s7.setChannel(ch);
+
+ MockSink s8 = new MockSink(8);
+ s8.setChannel(ch);
+
+ MockSink s9 = new MockSink(9);
+ s9.setChannel(ch);
+
+ MockSink s0 = new MockSink(0);
+ s0.setChannel(ch);
+
+ List<Sink> sinks = new ArrayList<Sink>();
+ sinks.add(s1);
+ sinks.add(s2);
+ sinks.add(s3);
+ sinks.add(s4);
+ sinks.add(s5);
+ sinks.add(s6);
+ sinks.add(s7);
+ sinks.add(s8);
+ sinks.add(s9);
+ sinks.add(s0);
+
+ LoadBalancingSinkProcessor lbsp = getProcessor("random",sinks);
+
+ Status s = Status.READY;
+ while (s != Status.BACKOFF) {
+ s = lbsp.process();
+ }
+
+ Set<Integer> sizeSet = new HashSet<Integer>();
+ int sum = 0;
+ for (Sink ms : sinks) {
+ int count = ((MockSink) ms).getEvents().size();
+ sum += count;
+ sizeSet.add(count);
+ }
+
+ // Assert that all the events were accounted for
+ Assert.assertEquals(n, sum);
+
+ // Assert that at least two sinks came with different event sizes.
+ // This makes sense if the total number of events is evenly divisible by
+ // the total number of sinks. In which case the round-robin policy will
+ // end up causing all sinks to get the same number of events where as
+ // the random policy will have very low probability of doing that.
+ Assert.assertTrue("Miraculous distribution", sizeSet.size() > 1);
+ }
+
+
+
+ @Test
+ public void testRoundRobinOneActiveSink() throws Exception {
+ Channel ch = new MockChannel();
+ int n = 10;
+ int numEvents = n;
+ for (int i = 0; i < numEvents; i++) {
+ ch.put(new MockEvent("test" + i));
+ }
+
+ MockSink s1 = new MockSink(1);
+ s1.setChannel(ch);
+
+ // s1 always fails
+ s1.setFail();
+
+ MockSink s2 = new MockSink(2);
+ s2.setChannel(ch);
+
+
+ MockSink s3 = new MockSink(3);
+ s3.setChannel(ch);
+
+ // s3 always fails
+ s3.setFail();
+
+ List<Sink> sinks = new ArrayList<Sink>();
+ sinks.add(s1);
+ sinks.add(s2);
+ sinks.add(s3);
+
+ LoadBalancingSinkProcessor lbsp = getProcessor("round_robin", sinks);
+
+ Sink.Status s = Sink.Status.READY;
+ while (s != Sink.Status.BACKOFF) {
+ s = lbsp.process();
+ }
+
+ Assert.assertTrue(s1.getEvents().size() == 0);
+ Assert.assertTrue(s2.getEvents().size() == n);
+ Assert.assertTrue(s3.getEvents().size() == 0);
+ }
+
+ @Test
+ public void testRoundRobinPersistentFailure() throws Exception {
+ Channel ch = new MockChannel();
+ int n = 100;
+ int numEvents = 3*n;
+ for (int i = 0; i < numEvents; i++) {
+ ch.put(new MockEvent("test" + i));
+ }
+
+ MockSink s1 = new MockSink(1);
+ s1.setChannel(ch);
+
+ MockSink s2 = new MockSink(2);
+ s2.setChannel(ch);
+
+ // s2 always fails
+ s2.setFail();
+
+ MockSink s3 = new MockSink(3);
+ s3.setChannel(ch);
+
+ List<Sink> sinks = new ArrayList<Sink>();
+ sinks.add(s1);
+ sinks.add(s2);
+ sinks.add(s3);
+
+ LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks);
+
+ Status s = Status.READY;
+ while (s != Status.BACKOFF) {
+ s = lbsp.process();
+ }
+
+ Assert.assertTrue(s1.getEvents().size() == n);
+ Assert.assertTrue(s2.getEvents().size() == 0);
+ Assert.assertTrue(s3.getEvents().size() == 2*n);
+ }
+
+ @Test
+ public void testRoundRobinNoFailure() throws Exception {
+
+ Channel ch = new MockChannel();
+ int n = 100;
+ int numEvents = 3*n;
+ for (int i = 0; i < numEvents; i++) {
+ ch.put(new MockEvent("test" + i));
+ }
+
+ MockSink s1 = new MockSink(1);
+ s1.setChannel(ch);
+
+ MockSink s2 = new MockSink(2);
+ s2.setChannel(ch);
+
+ MockSink s3 = new MockSink(3);
+ s3.setChannel(ch);
+
+ List<Sink> sinks = new ArrayList<Sink>();
+ sinks.add(s1);
+ sinks.add(s2);
+ sinks.add(s3);
+
+ LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks);
+
+ Status s = Status.READY;
+ while (s != Status.BACKOFF) {
+ s = lbsp.process();
+ }
+
+ Assert.assertTrue(s1.getEvents().size() == n);
+ Assert.assertTrue(s2.getEvents().size() == n);
+ Assert.assertTrue(s3.getEvents().size() == n);
+ }
+
+ @Test
+ public void testCustomSelector() throws Exception {
+ Channel ch = new MockChannel();
+ int n = 10;
+ int numEvents = n;
+ for (int i = 0; i < numEvents; i++) {
+ ch.put(new MockEvent("test" + i));
+ }
+
+ MockSink s1 = new MockSink(1);
+ s1.setChannel(ch);
+
+ // s1 always fails
+ s1.setFail();
+
+ MockSink s2 = new MockSink(2);
+ s2.setChannel(ch);
+
+ MockSink s3 = new MockSink(3);
+ s3.setChannel(ch);
+
+ List<Sink> sinks = new ArrayList<Sink>();
+ sinks.add(s1);
+ sinks.add(s2);
+ sinks.add(s3);
+
+ // This selector will result in all events going to s2
+ Context ctx = getContext(FixedOrderSelector.class.getCanonicalName());
+ ctx.put("selector." + FixedOrderSelector.SET_ME, "foo");
+ LoadBalancingSinkProcessor lbsp = getProcessor(sinks, ctx);
+
+ Sink.Status s = Sink.Status.READY;
+ while (s != Sink.Status.BACKOFF) {
+ s = lbsp.process();
+ }
+
+ Assert.assertTrue(s1.getEvents().size() == 0);
+ Assert.assertTrue(s2.getEvents().size() == n);
+ Assert.assertTrue(s3.getEvents().size() == 0);
+ }
+
+ private static class MockSink extends AbstractSink {
+
+ private final int id;
+
+ private List<Event> events = new ArrayList();
+
+ private boolean fail = false;
+
+ private MockSink(int id) {
+ this.id = id;
+ }
+
+ List<Event> getEvents() {
+ return events;
+ }
+
+ int getId() {
+ return id;
+ }
+
+ void setFail() {
+ fail = true;
+ }
+
+ @Override
+ public Status process() throws EventDeliveryException {
+ if (fail) {
+ throw new EventDeliveryException("failed");
+ }
+ Event e = this.getChannel().take();
+ if (e == null)
+ return Status.BACKOFF;
+
+ events.add(e);
+ return Status.READY;
+ }
+ }
+
+ private static class MockChannel extends AbstractChannel {
+
+ private List<Event> events = new ArrayList<Event>();
+
+ @Override
+ public void put(Event event) throws ChannelException {
+ events.add(event);
+ }
+
+ @Override
+ public Event take() throws ChannelException {
+ if (events.size() > 0) {
+ return events.remove(0);
+ }
+ return null;
+ }
+
+ @Override
+ public Transaction getTransaction() {
+ return null;
+ }
+
+ }
+
+ private static class MockEvent implements Event {
+
+ private static final Map<String, String> EMPTY_HEADERS =
+ Collections.unmodifiableMap(new HashMap<String, String>());
+
+ private byte[] body;
+
+ MockEvent(String str) {
+ this.body = str.getBytes();
+ }
+
+ @Override
+ public Map<String, String> getHeaders() {
+ return EMPTY_HEADERS;
+ }
+
+ @Override
+ public void setHeaders(Map<String, String> headers) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public byte[] getBody() {
+ return body;
+ }
+
+ @Override
+ public void setBody(byte[] body) {
+ this.body = body;
+ }
+ }
+}