You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/08 23:48:14 UTC
svn commit: r1298636 - in
/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume: ./
channel/ source/
Author: arvind
Date: Thu Mar 8 22:48:14 2012
New Revision: 1298636
URL: http://svn.apache.org/viewvc?rev=1298636&view=rev
Log:
FLUME-1021. Document API contracts and expected behavior in additional interfaces.
(Mike Percy via Arvind Prabhakar)
Modified:
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Channel.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Context.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/CounterGroup.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Event.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDeliveryException.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDrivenSource.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Source.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SourceRunner.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/EventDrivenSourceRunner.java
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Channel.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Channel.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Channel.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Channel.java Thu Mar 8 22:48:14 2012
@@ -21,35 +21,37 @@ import org.apache.flume.lifecycle.Lifecy
/**
* <p>
- * A channel connects a <tt>Source</tt> to a <tt>Sink</tt>. The source
+ * A channel connects a {@link Source} to a {@link Sink}. The source
* acts as producer while the sink acts as a consumer of events. The channel
* itself is the buffer between the two.
* </p>
* <p>
- * A channel exposes a <tt>Transaction</tt> interface that can be used by
- * its clients to ensure atomic <tt>put</tt> or <tt>remove</tt>
- * semantics. This is necessary to guarantee single hop reliability in a
- * logical node. For instance, a source will produce an event successfully
- * if and only if it can be committed to the channel. Similarly, a sink will
- * consume an event if and only if its end point can accept the event. The
+ * A channel exposes a {@link Transaction} interface that can be used by
+ * its clients to ensure atomic {@linkplain #put(Event) put} and
+ * {@linkplain #take() take} semantics.
+ * This is necessary to guarantee single hop reliability between agents.
+ * For instance, a source will successfully produce an {@linkplain Event event}
+ * if and only if that event can be committed to the source's associated
+ * channel. Similarly, a sink will consume an event if and
+ * only if its respective endpoint can accept the event. The
* extent of transaction support varies for different channel implementations
* ranging from strong to best-effort semantics.
* </p>
* <p>
- * Channels are associated with unique names that can be used for separating
- * configuration and working namespaces.
+ * Channels are associated with unique {@linkplain NamedComponent names} that
+ * can be used for separating configuration and working namespaces.
* </p>
*
- * @see org.apache.flume.EventSource
- * @see org.apache.flume.EventSink
+ * @see org.apache.flume.Source
+ * @see org.apache.flume.Sink
* @see org.apache.flume.Transaction
*/
public interface Channel extends LifecycleAware, NamedComponent {
/**
- * <p>Puts the given event in the channel.</p>
+ * <p>Puts the given event into the channel.</p>
* <p><strong>Note</strong>: This method must be invoked within an active
- * <tt>Transaction</tt> boundary. Failure to do so can lead to unpredictable
+ * {@link Transaction} boundary. Failure to do so can lead to unpredictable
* results.</p>
* @param event the event to transport.
* @throws ChannelException in case this operation fails.
@@ -59,12 +61,12 @@ public interface Channel extends Lifecyc
/**
* <p>Returns the next event from the channel if available. If the channel
- * does not have any events available, this method would return <tt>null</tt>.
+ * does not have any events available, this method must return {@code null}.
* </p>
* <p><strong>Note</strong>: This method must be invoked within an active
- * <tt>Transaction</tt> boundary. Failure to do so can lead to unpredictable
+ * {@link Transaction} boundary. Failure to do so can lead to unpredictable
* results.</p>
- * @return the next available event or <tt>null</tt> if no events are
+ * @return the next available event or {@code null} if no events are
* available.
* @throws ChannelException in case this operation fails.
* @see org.apache.flume.Transaction#begin()
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Context.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Context.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Context.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Context.java Thu Mar 8 22:48:14 2012
@@ -26,6 +26,10 @@ import java.util.Map;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+/**
+ * The context is a key-value store used to pass configuration information
+ * throughout the system.
+ */
public class Context {
private Map<String, String> parameters;
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/CounterGroup.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/CounterGroup.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/CounterGroup.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/CounterGroup.java Thu Mar 8 22:48:14 2012
@@ -23,6 +23,9 @@ import java.util.HashMap;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
+/**
+ * Used for counting events, collecting metrics, etc.
+ */
public class CounterGroup {
private String name;
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Event.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Event.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Event.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Event.java Thu Mar 8 22:48:14 2012
@@ -21,14 +21,32 @@ package org.apache.flume;
import java.util.Map;
+/**
+ * Basic representation of a data object in Flume.
+ * Provides access to data as it flows through the system.
+ */
public interface Event {
+ /**
+ * Returns a map of name-value pairs describing the data stored in the body.
+ */
public Map<String, String> getHeaders();
+ /**
+ * Set the event headers
+ * @param headers Map of headers to replace the current headers.
+ */
public void setHeaders(Map<String, String> headers);
+ /**
+ * Returns the raw byte array of the data contained in this event.
+ */
public byte[] getBody();
+ /**
+ * Sets the raw byte array of the data contained in this event.
+ * @param body The data.
+ */
public void setBody(byte[] body);
}
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDeliveryException.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDeliveryException.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDeliveryException.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDeliveryException.java Thu Mar 8 22:48:14 2012
@@ -19,6 +19,10 @@
package org.apache.flume;
+/**
+ * An event delivery exception is raised whenever an {@link Event} fails to
+ * reach at least one of its intended (next-hop) destinations.
+ */
public class EventDeliveryException extends Exception {
private static final long serialVersionUID = 1102327497549834945L;
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDrivenSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDrivenSource.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDrivenSource.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDrivenSource.java Thu Mar 8 22:48:14 2012
@@ -19,6 +19,11 @@
package org.apache.flume;
+/**
+ * A {@link Source} that does not need an external driver to poll for
+ * {@linkplain Event events} to ingest; it provides its own event-driven
+ * mechanism to invoke event processing.
+ */
public interface EventDrivenSource extends Source {
}
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java Thu Mar 8 22:48:14 2012
@@ -18,6 +18,10 @@
*/
package org.apache.flume;
+/**
+ * Enables a component to be tagged with a name so that it can be referred
+ * to uniquely within the configuration system.
+ */
public interface NamedComponent {
public void setName(String name);
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java Thu Mar 8 22:48:14 2012
@@ -19,6 +19,15 @@
package org.apache.flume;
+import org.apache.flume.source.EventDrivenSourceRunner;
+
+/**
+ * A {@link Source} that requires an external driver to poll to determine
+ * whether there are {@linkplain Event events} that are available to ingest
+ * from the source.
+ *
+ * @see org.apache.flume.source.EventDrivenSourceRunner
+ */
public interface PollableSource extends Source {
public Status process() throws EventDeliveryException;
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java Thu Mar 8 22:48:14 2012
@@ -26,6 +26,20 @@ import org.apache.flume.lifecycle.Lifecy
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * <p>
+ * A driver for {@linkplain Sink sinks} that polls them, attempting to
+ * {@linkplain Sink#process() process} events if any are available in the
+ * {@link Channel}.
+ * </p>
+ *
+ * <p>
+ * Note that, unlike {@linkplain Source sources}, all sinks are polled.
+ * </p>
+ *
+ * @see org.apache.flume.Sink
+ * @see org.apache.flume.SourceRunner
+ */
public class SinkRunner implements LifecycleAware {
private static final Logger logger = LoggerFactory
@@ -112,6 +126,11 @@ public class SinkRunner implements Lifec
return lifecycleState;
}
+ /**
+ * {@link Runnable} that {@linkplain SinkProcessor#process() polls} a
+ * {@link SinkProcessor} and manages event delivery notification,
+ * {@link Sink.Status BACKOFF} delay handling, etc.
+ */
public static class PollingRunner implements Runnable {
private SinkProcessor policy;
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Source.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Source.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Source.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Source.java Thu Mar 8 22:48:14 2012
@@ -22,10 +22,33 @@ package org.apache.flume;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.lifecycle.LifecycleAware;
+/**
+ * <p>
+ * A source generates {@plainlink Event events} and calls methods on the
+ * configured {@link ChannelProcessor} to persist those events into the
+ * configured {@linkplain Channel channels}.
+ * </p>
+ *
+ * <p>
+ * Sources are associated with unique {@linkplain NamedComponent names} that can
+ * be used for separating configuration and working namespaces.
+ * </p>
+ *
+ * @see org.apache.flume.Channel
+ * @see org.apache.flume.Sink
+ */
public interface Source extends LifecycleAware, NamedComponent {
+ /**
+ * Specifies which channel processor will handle this source's events.
+ *
+ * @param channelProcessor
+ */
public void setChannelProcessor(ChannelProcessor channelProcessor);
+ /**
+ * Returns the channel processor that will handle this source's events.
+ */
public ChannelProcessor getChannelProcessor();
}
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SourceRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SourceRunner.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SourceRunner.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SourceRunner.java Thu Mar 8 22:48:14 2012
@@ -23,10 +23,24 @@ import org.apache.flume.lifecycle.Lifecy
import org.apache.flume.source.EventDrivenSourceRunner;
import org.apache.flume.source.PollableSourceRunner;
+/**
+ * A source runner controls how a source is driven.
+ *
+ * This is an abstract class used for instantiating derived classes.
+ */
abstract public class SourceRunner implements LifecycleAware {
private Source source;
+ /**
+ * Static factory method to instantiate a source runner implementation that
+ * corresponds to the type of {@link Source} specified.
+ *
+ * @param source The source to run
+ * @return A runner that can run the specified source
+ * @throws IllegalArgumentException if the specified source does not implement
+ * a supported derived interface of {@link SourceRunner}.
+ */
public static SourceRunner forSource(Source source) {
SourceRunner runner = null;
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java Thu Mar 8 22:48:14 2012
@@ -31,6 +31,17 @@ import org.apache.flume.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * A channel processor exposes operations to put {@link Event}s into
+ * {@link Channel}s. These operations will propagate a {@link ChannelException}
+ * if any errors occur while attempting to write to {@code required} channels.
+ *
+ * Each channel processor instance is configured with a {@link ChannelSelector}
+ * instance that specifies which channels are
+ * {@linkplain ChannelSelector#getRequiredChannels(Event) required} and which
+ * channels are
+ * {@linkplain ChannelSelector#getOptionalChannels(Event) optional}.
+ */
public class ChannelProcessor {
private static final Logger LOG = LoggerFactory.getLogger(
@@ -46,6 +57,18 @@ public class ChannelProcessor {
return selector;
}
+ /**
+ * Attempts to {@linkplain Channel#put(Event) put} the given events into each
+ * configured channel. If any {@code required} channel throws a
+ * {@link ChannelException}, that exception will be propagated.
+ *
+ * <p>Note that if multiple channels are configured, some {@link Transaction}s
+ * may have already been committed while others may be rolled back in the
+ * case of an exception.
+ *
+ * @param events A list of events to put into the configured channels.
+ * @throws ChannelException when a write to a required channel fails.
+ */
public void processEventBatch(List<Event> events) {
Map<Channel, List<Event>> reqChannelQueue =
new LinkedHashMap<Channel, List<Event>>();
@@ -128,6 +151,18 @@ public class ChannelProcessor {
}
}
+ /**
+ * Attempts to {@linkplain Channel#put(Event) put} the given event into each
+ * configured channel. If any {@code required} channel throws a
+ * {@link ChannelException}, that exception will be propagated.
+ *
+ * <p>Note that if multiple channels are configured, some {@link Transaction}s
+ * may have already been committed while others may be rolled back in the
+ * case of an exception.
+ *
+ * @param event The event to put into the configured channels.
+ * @throws ChannelException when a write to a required channel fails.
+ */
public void processEvent(Event event) {
// Process required channels
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/EventDrivenSourceRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/EventDrivenSourceRunner.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/EventDrivenSourceRunner.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/EventDrivenSourceRunner.java Thu Mar 8 22:48:14 2012
@@ -22,6 +22,10 @@ package org.apache.flume.source;
import org.apache.flume.SourceRunner;
import org.apache.flume.lifecycle.LifecycleState;
+/**
+ * Starts, stops, and manages
+ * {@linkplain EventDrivenSource event-driven sources}.
+ */
public class EventDrivenSourceRunner extends SourceRunner {
private LifecycleState lifecycleState;