You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/10/22 00:43:06 UTC

svn commit: r1187586 - in /incubator/flume/branches/flume-728: flume-ng-core/src/main/java/org/apache/flume/source/ flume-ng-node/src/test/java/org/apache/flume/source/

Author: esammer
Date: Fri Oct 21 22:43:05 2011
New Revision: 1187586

URL: http://svn.apache.org/viewvc?rev=1187586&view=rev
Log:
FLUME-786: Write javadoc for builtin sources

- Changed netcat name param to bind to be consistent with other server-ish
  sources. It's now possible to bind to a specific IP:port rather than
  just a port.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java?rev=1187586&r1=1187585&r2=1187586&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java Fri Oct 21 22:43:05 2011
@@ -17,6 +17,7 @@ import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
+import org.apache.flume.Source;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.event.EventBuilder;
@@ -26,6 +27,55 @@ import org.apache.flume.source.avro.Stat
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * <p>
+ * A {@link Source} implementation that receives Avro events from clients that
+ * implement {@link AvroSourceProtocol}.
+ * </p>
+ * <p>
+ * This source forms one half of Flume's tiered collection support. Internally,
+ * this source uses Avro's <tt>NettyTransceiver</tt> to listen for, and handle
+ * events. It can be paired with the builtin <tt>AvroSink</tt> to create tiered
+ * collection topologies. Of course, nothing prevents one from using this source
+ * to receive data from other custom built infrastructure that uses the same
+ * Avro protocol (specifically {@link AvroSourceProtocol}).
+ * </p>
+ * <p>
+ * Events may be received from the client either singly or in batches.Generally,
+ * larger batches are far more efficient, but introduce a slight delay (measured
+ * in millis) in delivery. A batch submitted to the configured {@link Channel}
+ * atomically (i.e. either all events make it into the channel or none).
+ * </p>
+ * <p>
+ * <b>Configuration options</b>
+ * </p>
+ * <table>
+ * <tr>
+ * <th>Parameter</th>
+ * <th>Description</th>
+ * <th>Unit / Type</th>
+ * <th>Default</th>
+ * </tr>
+ * <tr>
+ * <td><tt>bind</tt></td>
+ * <td>The hostname or IP to which the source will bind.</td>
+ * <td>Hostname or IP / String</td>
+ * <td>none (required)</td>
+ * </tr>
+ * <tr>
+ * <td><tt>port</tt></td>
+ * <td>The port to which the source will bind and listen for events.</td>
+ * <td>TCP port / int</td>
+ * <td>none (required)</td>
+ * </tr>
+ * </table>
+ * <p>
+ * <b>Metrics</b>
+ * </p>
+ * <p>
+ * TODO
+ * </p>
+ */
 public class AvroSource extends AbstractSource implements EventDrivenSource,
     Configurable, AvroSourceProtocol {
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java?rev=1187586&r1=1187585&r2=1187586&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java Fri Oct 21 22:43:05 2011
@@ -13,6 +13,7 @@ import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
+import org.apache.flume.Source;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.event.EventBuilder;
@@ -21,6 +22,72 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
+/**
+ * <p>
+ * A {@link Source} implementation that executes a Unix process and turns each
+ * line of text into an event.
+ * </p>
+ * <p>
+ * This source runs a given Unix command on start up and expects that process to
+ * continuously produce data on standard out (stderr is simply discarded). If
+ * the process exits for any reason, the source also exits and will produce no
+ * further data. This means configurations such as <tt>cat [named pipe]</tt> or
+ * <tt>tail -F [file]</tt> are going to produce the desired results where as
+ * <tt>date</tt> will probably not - the former two commands produce streams of
+ * data where as the latter produces a single event and exits.
+ * </p>
+ * <p>
+ * The <tt>ExecSource</tt> is meant for situations where one must integrate with
+ * existing systems without modifying code. It is a compatibility gateway built
+ * to allow simple, stop-gap integration and doesn't necessarily offer all of
+ * the benefits or guarantees of native integration with Flume. If one has the
+ * option of using the <tt>AvroSource</tt>, for instance, that would be greatly
+ * preferred to this source as it (and similarly implemented sources) can
+ * maintain the transactional guarantees that exec can not.
+ * </p>
+ * <p>
+ * <i>Why doesn't <tt>ExecSource</tt> offer transactional guarantees?</i>
+ * </p>
+ * <p>
+ * The problem with <tt>ExecSource</tt> and other asynchronous sources is that
+ * the source can not guarantee that if there is a failure to put the event into
+ * the {@link Channel} the client knows about it. As a for instance, one of the
+ * most commonly requested features is the <tt>tail -F [file]</tt>-like use case
+ * where an application writes to a log file on disk and Flume tails the file,
+ * sending each line as an event. While this is possible, there's an obvious
+ * problem; what happens if the channel fills up and Flume can't send an event?
+ * Flume has no way of indicating to the application writing the log file that
+ * it needs to retain the log or that the event hasn't been sent, for some
+ * reason. If this doesn't make sense, you need only know this: <b>Your
+ * application can never guarantee data has been received when using a
+ * unidirectional asynchronous interface such as ExecSource!</b> As an extension
+ * of this warning - and to be completely clear - there is absolutely zero
+ * guarantee of event delivery when using this source. You have been warned.
+ * </p>
+ * <p>
+ * <b>Configuration options</b>
+ * </p>
+ * <table>
+ * <tr>
+ * <th>Parameter</th>
+ * <th>Description</th>
+ * <th>Unit / Type</th>
+ * <th>Default</th>
+ * </tr>
+ * <tr>
+ * <td><tt>command</tt></td>
+ * <td>The command to execute</td>
+ * <td>String</td>
+ * <td>none (required)</td>
+ * </tr>
+ * </table>
+ * <p>
+ * <b>Metrics</b>
+ * </p>
+ * <p>
+ * TODO
+ * </p>
+ */
 public class ExecSource extends AbstractSource implements EventDrivenSource,
     Configurable {
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1187586&r1=1187585&r2=1187586&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java Fri Oct 21 22:43:05 2011
@@ -29,12 +29,59 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+/**
+ * <p>
+ * A netcat-like source that listens on a given port and turns each line of text
+ * into an event.
+ * </p>
+ * <p>
+ * This source, primarily built for testing and exceedingly simple systems, acts
+ * like <tt>nc -k -l [host] [port]</tt>. In other words, it opens a specified
+ * port and listens for data. The expectation is that the supplied data is
+ * newline separated text. Each line of text is turned into a Flume event and
+ * sent via the connected channel.
+ * </p>
+ * <p>
+ * Most testing has been done by using the <tt>nc</tt> client but other,
+ * similarly implemented, clients should work just fine.
+ * </p>
+ * <p>
+ * <b>Configuration options</b>
+ * </p>
+ * <table>
+ * <tr>
+ * <th>Parameter</th>
+ * <th>Description</th>
+ * <th>Unit / Type</th>
+ * <th>Default</th>
+ * </tr>
+ * <tr>
+ * <td><tt>bind</tt></td>
+ * <td>The hostname or IP to which the source will bind.</td>
+ * <td>Hostname or IP / String</td>
+ * <td>none (required)</td>
+ * </tr>
+ * <tr>
+ * <td><tt>port</tt></td>
+ * <td>The port to which the source will bind and listen for events.</td>
+ * <td>TCP port / int</td>
+ * <td>none (required)</td>
+ * </tr>
+ * </table>
+ * <p>
+ * <b>Metrics</b>
+ * </p>
+ * <p>
+ * TODO
+ * </p>
+ */
 public class NetcatSource extends AbstractSource implements Configurable,
     EventDrivenSource {
 
   private static final Logger logger = LoggerFactory
       .getLogger(NetcatSource.class);
 
+  private String hostName;
   private int port;
 
   private CounterGroup counterGroup;
@@ -53,8 +100,9 @@ public class NetcatSource extends Abstra
 
   @Override
   public void configure(Context context) {
-    Configurables.ensureRequiredNonNull(context, "name", "port");
+    Configurables.ensureRequiredNonNull(context, "bind", "port");
 
+    hostName = context.get("bind", String.class);
     port = Integer.parseInt(context.get("port", String.class));
   }
 
@@ -71,7 +119,7 @@ public class NetcatSource extends Abstra
         .setNameFormat("netcat-handler-%d").build());
 
     try {
-      SocketAddress bindPoint = new InetSocketAddress(port);
+      SocketAddress bindPoint = new InetSocketAddress(hostName, port);
 
       serverSocket = ServerSocketChannel.open();
       serverSocket.socket().setReuseAddress(true);

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1187586&r1=1187585&r2=1187586&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Fri Oct 21 22:43:05 2011
@@ -48,7 +48,7 @@ public class TestNetcatSource {
     Context context = new Context();
 
     /* FIXME: Use a random port for testing. */
-    context.put("name", "test");
+    context.put("bind", "0.0.0.0");
     context.put("port", "41414");
 
     Configurables.configure(source, context);