You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/08 23:48:14 UTC

svn commit: r1298636 - in /incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume: ./ channel/ source/

Author: arvind
Date: Thu Mar  8 22:48:14 2012
New Revision: 1298636

URL: http://svn.apache.org/viewvc?rev=1298636&view=rev
Log:
FLUME-1021. Document API contracts and expected behavior in additional interfaces.

(Mike Percy via Arvind Prabhakar)

Modified:
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Channel.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Context.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/CounterGroup.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Event.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDeliveryException.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDrivenSource.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Source.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SourceRunner.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/EventDrivenSourceRunner.java

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Channel.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Channel.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Channel.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Channel.java Thu Mar  8 22:48:14 2012
@@ -21,35 +21,37 @@ import org.apache.flume.lifecycle.Lifecy
 
 /**
  * <p>
- * A channel connects a <tt>Source</tt> to a <tt>Sink</tt>. The source
+ * A channel connects a {@link Source} to a {@link Sink}. The source
  * acts as producer while the sink acts as a consumer of events. The channel
  * itself is the buffer between the two.
  * </p>
  * <p>
- * A channel exposes a <tt>Transaction</tt> interface that can be used by
- * its clients to ensure atomic <tt>put</tt> or <tt>remove</tt>
- * semantics. This is necessary to guarantee single hop reliability in a
- * logical node. For instance, a source will produce an event successfully
- * if and only if it can be committed to the channel. Similarly, a sink will
- * consume an event if and only if its end point can accept the event. The
+ * A channel exposes a {@link Transaction} interface that can be used by
+ * its clients to ensure atomic {@linkplain #put(Event) put} and
+ * {@linkplain #take() take} semantics.
+ * This is necessary to guarantee single hop reliability between agents.
+ * For instance, a source will successfully produce an {@linkplain Event event}
+ * if and only if that event can be committed to the source's associated
+ * channel. Similarly, a sink will consume an event if and
+ * only if its respective endpoint can accept the event. The
  * extent of transaction support varies for different channel implementations
  * ranging from strong to best-effort semantics.
  * </p>
  * <p>
- * Channels are associated with unique names that can be used for separating
- * configuration and working namespaces.
+ * Channels are associated with unique {@linkplain NamedComponent names} that
+ * can be used for separating configuration and working namespaces.
  * </p>
  *
- * @see org.apache.flume.EventSource
- * @see org.apache.flume.EventSink
+ * @see org.apache.flume.Source
+ * @see org.apache.flume.Sink
  * @see org.apache.flume.Transaction
  */
 public interface Channel extends LifecycleAware, NamedComponent {
 
   /**
-   * <p>Puts the given event in the channel.</p>
+   * <p>Puts the given event into the channel.</p>
    * <p><strong>Note</strong>: This method must be invoked within an active
-   * <tt>Transaction</tt> boundary. Failure to do so can lead to unpredictable
+   * {@link Transaction} boundary. Failure to do so can lead to unpredictable
    * results.</p>
    * @param event the event to transport.
    * @throws ChannelException in case this operation fails.
@@ -59,12 +61,12 @@ public interface Channel extends Lifecyc
 
   /**
    * <p>Returns the next event from the channel if available. If the channel
-   * does not have any events available, this method would return <tt>null</tt>.
+   * does not have any events available, this method must return {@code null}.
    * </p>
    * <p><strong>Note</strong>: This method must be invoked within an active
-   * <tt>Transaction</tt> boundary. Failure to do so can lead to unpredictable
+   * {@link Transaction} boundary. Failure to do so can lead to unpredictable
    * results.</p>
-   * @return the next available event or <tt>null</tt> if no events are
+   * @return the next available event or {@code null} if no events are
    * available.
    * @throws ChannelException in case this operation fails.
    * @see org.apache.flume.Transaction#begin()

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Context.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Context.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Context.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Context.java Thu Mar  8 22:48:14 2012
@@ -26,6 +26,10 @@ import java.util.Map;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
+/**
+ * The context is a key-value store used to pass configuration information
+ * throughout the system.
+ */
 public class Context {
 
   private Map<String, String> parameters;

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/CounterGroup.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/CounterGroup.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/CounterGroup.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/CounterGroup.java Thu Mar  8 22:48:14 2012
@@ -23,6 +23,9 @@ import java.util.HashMap;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicLong;
 
+/**
+ * Used for counting events, collecting metrics, etc.
+ */
 public class CounterGroup {
 
   private String name;

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Event.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Event.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Event.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Event.java Thu Mar  8 22:48:14 2012
@@ -21,14 +21,32 @@ package org.apache.flume;
 
 import java.util.Map;
 
+/**
+ * Basic representation of a data object in Flume.
+ * Provides access to data as it flows through the system.
+ */
 public interface Event {
 
+  /**
+   * Returns a map of name-value pairs describing the data stored in the body.
+   */
   public Map<String, String> getHeaders();
 
+  /**
+   * Set the event headers
+   * @param headers Map of headers to replace the current headers.
+   */
   public void setHeaders(Map<String, String> headers);
 
+  /**
+   * Returns the raw byte array of the data contained in this event.
+   */
   public byte[] getBody();
 
+  /**
+   * Sets the raw byte array of the data contained in this event.
+   * @param body The data.
+   */
   public void setBody(byte[] body);
 
 }

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDeliveryException.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDeliveryException.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDeliveryException.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDeliveryException.java Thu Mar  8 22:48:14 2012
@@ -19,6 +19,10 @@
 
 package org.apache.flume;
 
+/**
+ * An event delivery exception is raised whenever an {@link Event} fails to
+ * reach at least one of its intended (next-hop) destinations.
+ */
 public class EventDeliveryException extends Exception {
 
   private static final long serialVersionUID = 1102327497549834945L;

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDrivenSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDrivenSource.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDrivenSource.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDrivenSource.java Thu Mar  8 22:48:14 2012
@@ -19,6 +19,11 @@
 
 package org.apache.flume;
 
+/**
+ * A {@link Source} that does not need an external driver to poll for
+ * {@linkplain Event events} to ingest; it provides its own event-driven
+ * mechanism to invoke event processing.
+ */
 public interface EventDrivenSource extends Source {
 
 }

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/NamedComponent.java Thu Mar  8 22:48:14 2012
@@ -18,6 +18,10 @@
  */
 package org.apache.flume;
 
+/**
+ * Enables a component to be tagged with a name so that it can be referred
+ * to uniquely within the configuration system.
+ */
 public interface NamedComponent {
 
   public void setName(String name);

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/PollableSource.java Thu Mar  8 22:48:14 2012
@@ -19,6 +19,15 @@
 
 package org.apache.flume;
 
+import org.apache.flume.source.EventDrivenSourceRunner;
+
+/**
+ * A {@link Source} that requires an external driver to poll to determine
+ * whether there are {@linkplain Event events} that are available to ingest
+ * from the source.
+ *
+ * @see org.apache.flume.source.EventDrivenSourceRunner
+ */
 public interface PollableSource extends Source {
 
   public Status process() throws EventDeliveryException;

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java Thu Mar  8 22:48:14 2012
@@ -26,6 +26,20 @@ import org.apache.flume.lifecycle.Lifecy
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * <p>
+ * A driver for {@linkplain Sink sinks} that polls them, attempting to
+ * {@linkplain Sink#process() process} events if any are available in the
+ * {@link Channel}.
+ * </p>
+ *
+ * <p>
+ * Note that, unlike {@linkplain Source sources}, all sinks are polled.
+ * </p>
+ *
+ * @see org.apache.flume.Sink
+ * @see org.apache.flume.SourceRunner
+ */
 public class SinkRunner implements LifecycleAware {
 
   private static final Logger logger = LoggerFactory
@@ -112,6 +126,11 @@ public class SinkRunner implements Lifec
     return lifecycleState;
   }
 
+  /**
+   * {@link Runnable} that {@linkplain SinkProcessor#process() polls} a
+   * {@link SinkProcessor} and manages event delivery notification,
+   * {@link Sink.Status BACKOFF} delay handling, etc.
+   */
   public static class PollingRunner implements Runnable {
 
     private SinkProcessor policy;

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Source.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Source.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Source.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Source.java Thu Mar  8 22:48:14 2012
@@ -22,10 +22,33 @@ package org.apache.flume;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.lifecycle.LifecycleAware;
 
+/**
+ * <p>
+ * A source generates {@plainlink Event events} and calls methods on the
+ * configured {@link ChannelProcessor} to persist those events into the
+ * configured {@linkplain Channel channels}.
+ * </p>
+ *
+ * <p>
+ * Sources are associated with unique {@linkplain NamedComponent names} that can
+ * be used for separating configuration and working namespaces.
+ * </p>
+ *
+ * @see org.apache.flume.Channel
+ * @see org.apache.flume.Sink
+ */
 public interface Source extends LifecycleAware, NamedComponent {
 
+  /**
+   * Specifies which channel processor will handle this source's events.
+   *
+   * @param channelProcessor
+   */
   public void setChannelProcessor(ChannelProcessor channelProcessor);
 
+  /**
+   * Returns the channel processor that will handle this source's events.
+   */
   public ChannelProcessor getChannelProcessor();
 
 }

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SourceRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SourceRunner.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SourceRunner.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/SourceRunner.java Thu Mar  8 22:48:14 2012
@@ -23,10 +23,24 @@ import org.apache.flume.lifecycle.Lifecy
 import org.apache.flume.source.EventDrivenSourceRunner;
 import org.apache.flume.source.PollableSourceRunner;
 
+/**
+ * A source runner controls how a source is driven.
+ *
+ * This is an abstract class used for instantiating derived classes.
+ */
 abstract public class SourceRunner implements LifecycleAware {
 
   private Source source;
 
+  /**
+   * Static factory method to instantiate a source runner implementation that
+   * corresponds to the type of {@link Source} specified.
+   *
+   * @param source The source to run
+   * @return A runner that can run the specified source
+   * @throws IllegalArgumentException if the specified source does not implement
+   * a supported derived interface of {@link SourceRunner}.
+   */
   public static SourceRunner forSource(Source source) {
     SourceRunner runner = null;
 

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java Thu Mar  8 22:48:14 2012
@@ -31,6 +31,17 @@ import org.apache.flume.Transaction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * A channel processor exposes operations to put {@link Event}s into
+ * {@link Channel}s. These operations will propagate a {@link ChannelException}
+ * if any errors occur while attempting to write to {@code required} channels.
+ *
+ * Each channel processor instance is configured with a {@link ChannelSelector}
+ * instance that specifies which channels are
+ * {@linkplain ChannelSelector#getRequiredChannels(Event) required} and which
+ * channels are
+ * {@linkplain ChannelSelector#getOptionalChannels(Event) optional}.
+ */
 public class ChannelProcessor {
 
   private static final Logger LOG = LoggerFactory.getLogger(
@@ -46,6 +57,18 @@ public class ChannelProcessor {
     return selector;
   }
 
+  /**
+   * Attempts to {@linkplain Channel#put(Event) put} the given events into each
+   * configured channel. If any {@code required} channel throws a
+   * {@link ChannelException}, that exception will be propagated.
+   *
+   * <p>Note that if multiple channels are configured, some {@link Transaction}s
+   * may have already been committed while others may be rolled back in the
+   * case of an exception.
+   *
+   * @param events A list of events to put into the configured channels.
+   * @throws ChannelException when a write to a required channel fails.
+   */
   public void processEventBatch(List<Event> events) {
     Map<Channel, List<Event>> reqChannelQueue =
         new LinkedHashMap<Channel, List<Event>>();
@@ -128,6 +151,18 @@ public class ChannelProcessor {
     }
   }
 
+  /**
+   * Attempts to {@linkplain Channel#put(Event) put} the given event into each
+   * configured channel. If any {@code required} channel throws a
+   * {@link ChannelException}, that exception will be propagated.
+   *
+   * <p>Note that if multiple channels are configured, some {@link Transaction}s
+   * may have already been committed while others may be rolled back in the
+   * case of an exception.
+   *
+   * @param event The event to put into the configured channels.
+   * @throws ChannelException when a write to a required channel fails.
+   */
   public void processEvent(Event event) {
 
     // Process required channels

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/EventDrivenSourceRunner.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/EventDrivenSourceRunner.java?rev=1298636&r1=1298635&r2=1298636&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/EventDrivenSourceRunner.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/EventDrivenSourceRunner.java Thu Mar  8 22:48:14 2012
@@ -22,6 +22,10 @@ package org.apache.flume.source;
 import org.apache.flume.SourceRunner;
 import org.apache.flume.lifecycle.LifecycleState;
 
+/**
+ * Starts, stops, and manages
+ * {@linkplain EventDrivenSource event-driven sources}.
+ */
 public class EventDrivenSourceRunner extends SourceRunner {
 
   private LifecycleState lifecycleState;