You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/13 03:25:32 UTC

svn commit: r1299958 [1/2] - in /incubator/flume/trunk: ./ flume-ng-channels/flume-file-channel/ flume-ng-channels/flume-jdbc-channel/ flume-ng-clients/flume-ng-log4jappender/ flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clien...

Author: arvind
Date: Tue Mar 13 02:25:30 2012
New Revision: 1299958

URL: http://svn.apache.org/viewvc?rev=1299958&view=rev
Log:
FLUME-989. Factor Flume Avro RPC interfaces out in separate Client SDK.

(Mike Percy via Arvind Prabhakar)

Added:
    incubator/flume/trunk/flume-ng-sdk/   (with props)
    incubator/flume/trunk/flume-ng-sdk/pom.xml
    incubator/flume/trunk/flume-ng-sdk/src/
    incubator/flume/trunk/flume-ng-sdk/src/main/
    incubator/flume/trunk/flume-ng-sdk/src/main/avro/
    incubator/flume/trunk/flume-ng-sdk/src/main/avro/flume.avdl
    incubator/flume/trunk/flume-ng-sdk/src/main/java/
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/Event.java   (with props)
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/EventDeliveryException.java   (with props)
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/FlumeException.java   (with props)
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java   (with props)
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClient.java   (with props)
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java   (with props)
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java   (with props)
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java   (with props)
    incubator/flume/trunk/flume-ng-sdk/src/test/
    incubator/flume/trunk/flume-ng-sdk/src/test/java/
    incubator/flume/trunk/flume-ng-sdk/src/test/java/org/
    incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/
    incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/
    incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/
    incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java   (with props)
    incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java   (with props)
    incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestRpcClientFactory.java   (with props)
    incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/event/
    incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/event/TestEventBuilder.java   (with props)
Removed:
    incubator/flume/trunk/flume-ng-core/src/main/avro/flume.avdl
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Event.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDeliveryException.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/FlumeException.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java
Modified:
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml
    incubator/flume/trunk/flume-ng-channels/flume-jdbc-channel/pom.xml
    incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/pom.xml
    incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
    incubator/flume/trunk/flume-ng-core/pom.xml
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
    incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/pom.xml
    incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
    incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java
    incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/pom.xml
    incubator/flume/trunk/flume-ng-node/pom.xml
    incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/pom.xml
    incubator/flume/trunk/flume-ng-sinks/flume-irc-sink/pom.xml
    incubator/flume/trunk/pom.xml

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml Tue Mar 13 02:25:30 2012
@@ -44,6 +44,11 @@
 
     <dependency>
       <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-core</artifactId>
     </dependency>
 

Modified: incubator/flume/trunk/flume-ng-channels/flume-jdbc-channel/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-jdbc-channel/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-jdbc-channel/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-jdbc-channel/pom.xml Tue Mar 13 02:25:30 2012
@@ -49,6 +49,10 @@ limitations under the License.
   <dependencies>
     <dependency>
       <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-core</artifactId>
     </dependency>
     <dependency>

Modified: incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/pom.xml Tue Mar 13 02:25:30 2012
@@ -39,25 +39,30 @@ limitations under the License.
   </build>
 
   <dependencies>
+
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+
     <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
     </dependency>
 
     <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro-ipc</artifactId>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
     </dependency>
 
+    <!-- pull in flume core only for unit tests. TODO: not ideal -->
     <dependency>
       <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-core</artifactId>
+      <scope>test</scope>
     </dependency>
+
   </dependencies>
 
 

Modified: incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java (original)
+++ incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java Tue Mar 13 02:25:30 2012
@@ -18,28 +18,25 @@
  */
 package org.apache.flume.clients.log4jappender;
 
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyTransceiver;
-import org.apache.avro.ipc.Transceiver;
-import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
-import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.helpers.LogLog;
 import org.apache.log4j.spi.LoggingEvent;
-import org.apache.flume.clients.log4jappender.Log4jAvroHeaders;
+
 /**
  *
  * Appends Log4j Events to an external Flume client which is decribed by
- * the Log4j configuration file. The appender takes two parameters
+ * the Log4j configuration file. The appender takes two required parameters:
  *<p>
  *<strong>Hostname</strong> : This is the hostname of the first hop
  *at which Flume (through an AvroSource) is listening for events.
@@ -65,17 +62,17 @@ public class Log4jAppender extends Appen
 
   private String hostname;
   private int port;
-  private AvroSourceProtocol protocolClient = null;
-  private Transceiver transceiver = null;
+  private RpcClient rpcClient = null;
 
 
   /**
-   * If this constructor is used programatically, rather than from a log4j conf,
+   * If this constructor is used programmatically rather than from a log4j conf
    * you must set the <tt>port</tt> and <tt>hostname</tt> and then call
    * <tt>activateOptions()</tt> before calling <tt>append()</tt>.
    */
   public Log4jAppender(){
   }
+
   /**
    * Sets the hostname and port. Even if these are passed the
    * <tt>activateOptions()</tt> function must be called before calling
@@ -92,24 +89,27 @@ public class Log4jAppender extends Appen
   /**
    * Append the LoggingEvent, to send to the first Flume hop.
    * @param event The LoggingEvent to be appended to the flume.
-   * @throws FlumeException if the appender was closed
-   * or the hostname and port were not setup.
+   * @throws FlumeException if the appender was closed,
+   * or the hostname and port were not setup, there was a timeout, or there
+   * was a connection error.
    */
   @Override
   public synchronized void append(LoggingEvent event) throws FlumeException{
-    //If protocolClient is null, it means either this appender object was never
+    //If rpcClient is null, it means either this appender object was never
     //setup by setting hostname and port and then calling activateOptions
     //or this appender object was closed by calling close(), so we throw an
     //exception to show the appender is no longer accessible.
-    if(protocolClient == null){
+    if(rpcClient == null){
       throw new FlumeException("Cannot Append to Appender!" +
           "Appender either closed or not setup correctly!");
     }
 
+    if(!rpcClient.isActive()){
+      reconnect();
+    }
+
     //Client created first time append is called.
-    AvroFlumeEvent avroEvent = new AvroFlumeEvent();
-    Map<CharSequence, CharSequence> hdrs =
-        new HashMap<CharSequence, CharSequence>();
+    Map<String, String> hdrs = new HashMap<String, String>();
     hdrs.put(Log4jAvroHeaders.LOGGER_NAME.toString(), event.getLoggerName());
     hdrs.put(Log4jAvroHeaders.TIMESTAMP.toString(),
         String.valueOf(event.getTimeStamp()));
@@ -120,20 +120,16 @@ public class Log4jAppender extends Appen
     hdrs.put(Log4jAvroHeaders.LOG_LEVEL.toString(),
         String.valueOf(event.getLevel().toInt()));
     hdrs.put(Log4jAvroHeaders.MESSAGE_ENCODING.toString(), "UTF8");
-    avroEvent.setHeaders(hdrs);
+
+    Event flumeEvent = EventBuilder.withBody(event.getMessage().toString(),
+        Charset.forName("UTF8"), hdrs);
+
     try {
-      avroEvent.setBody(ByteBuffer.wrap(
-          event.getMessage().toString().getBytes("UTF8")));
-      protocolClient.append(avroEvent);
-    } catch (UnsupportedEncodingException e) {
-      String errormsg = "Unable to encode body of event! Event lost! " +
-          e.getMessage();
-      LogLog.error(errormsg);
-      throw new FlumeException(errormsg,e);
-    } catch (AvroRemoteException e) {
-      String errormsg = "Avro Remote Exception: " + e.getMessage();
-      LogLog.error(errormsg);
-      throw new FlumeException(errormsg,e);
+      rpcClient.append(flumeEvent);
+    } catch (EventDeliveryException e) {
+      String msg = "Flume append() failed.";
+      LogLog.error(msg);
+      throw new FlumeException(msg + " Exception follows.", e);
     }
   }
 
@@ -144,26 +140,22 @@ public class Log4jAppender extends Appen
    * Closes underlying client.
    * If <tt>append()</tt> is called after this function is called,
    * it will throw an exception.
-   * @throws FlumeException if appender was closed once or not setup.
+   * @throws FlumeException if errors occur during close
    */
   @Override
   public synchronized void close() throws FlumeException{
-    //Simply set protocol client to null and let Java do the cleanup
-    //when the client is no longer accessible.
     //Any append calls after this will result in an Exception.
-    try {
-      transceiver.close();
-    } catch (IOException e) {
-      throw new FlumeException("Attempting to close " +
-          "Appender which is already closed or one which was never opened", e);
+    if (rpcClient != null) {
+      rpcClient.close();
+      rpcClient = null;
     }
-    protocolClient = null;
   }
 
   @Override
   public boolean requiresLayout() {
     return false;
   }
+
   /**
    * Set the first flume hop hostname.
    * @param hostname The first hop where the client should connect to.
@@ -171,6 +163,7 @@ public class Log4jAppender extends Appen
   public void setHostname(String hostname){
     this.hostname = hostname;
   }
+
   /**
    * Set the port on the hostname to connect to.
    * @param port The port to connect on the host.
@@ -188,17 +181,22 @@ public class Log4jAppender extends Appen
   @Override
   public void activateOptions() throws FlumeException{
     try {
-      transceiver = new NettyTransceiver(
-          new InetSocketAddress(hostname, port));
-      protocolClient = SpecificRequestor.getClient(
-          AvroSourceProtocol.class, transceiver);
-    } catch (IOException e) {
-      String errormsg = "Avro Client creation failed! "+
+      rpcClient = RpcClientFactory.getInstance(hostname, port);
+    } catch (FlumeException e) {
+      String errormsg = "RPC client creation failed! " +
           e.getMessage();
       LogLog.error(errormsg);
-      throw new FlumeException(errormsg,e);
-
+      throw e;
     }
   }
+
+  /**
+   * Make it easy to reconnect on failure
+   * @throws FlumeException
+   */
+  private void reconnect() throws FlumeException {
+    close();
+    activateOptions();
+  }
 }
 

Modified: incubator/flume/trunk/flume-ng-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-core/pom.xml Tue Mar 13 02:25:30 2012
@@ -33,36 +33,6 @@ limitations under the License.
     <plugins>
 
       <plugin>
-        <groupId>org.apache.avro</groupId>
-        <artifactId>avro-maven-plugin</artifactId>
-        <executions>
-          <execution>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>idl-protocol</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <plugin>
-        <groupId>com.thoughtworks.paranamer</groupId>
-        <artifactId>paranamer-maven-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>run</id>
-            <configuration>
-              <sourceDirectory>${project.build.directory}/generated-sources/avro</sourceDirectory>
-              <outputDirectory>${project.build.directory}/classes</outputDirectory>
-            </configuration>
-            <goals>
-              <goal>generate</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <plugin>
         <groupId>org.apache.rat</groupId>
         <artifactId>apache-rat-plugin</artifactId>
       </plugin>
@@ -73,6 +43,11 @@ limitations under the License.
   <dependencies>
 
     <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java Tue Mar 13 02:25:30 2012
@@ -24,24 +24,22 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.nio.charset.Charset;
 import java.util.List;
 
-import org.apache.avro.ipc.NettyTransceiver;
-import org.apache.avro.ipc.Transceiver;
-import org.apache.avro.ipc.specific.SpecificRequestor;
+import com.google.common.collect.Lists;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.apache.flume.source.avro.AvroSourceProtocol;
-import org.apache.flume.source.avro.Status;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +48,8 @@ public class AvroCLIClient {
   private static final Logger logger = LoggerFactory
       .getLogger(AvroCLIClient.class);
 
+  private static final int BATCH_SIZE = 5;
+
   private String hostname;
   private int port;
   private String fileName;
@@ -68,6 +68,10 @@ public class AvroCLIClient {
     } catch (IOException e) {
       logger.error("Unable to send data to Flume - {}", e.getMessage());
       logger.debug("Exception follows.", e);
+    } catch (FlumeException e) {
+      logger.error("Unable to open connection to Flume. Exception follows.", e);
+    } catch (EventDeliveryException e) {
+      logger.error("Unable to deliver events to Flume. Exception follows.", e);
     }
 
     logger.debug("Exiting");
@@ -108,16 +112,14 @@ public class AvroCLIClient {
     return true;
   }
 
-  private void run() throws IOException {
+  private void run() throws IOException, FlumeException,
+      EventDeliveryException {
 
-    Transceiver transceiver = null;
     BufferedReader reader = null;
 
+    RpcClient rpcClient = RpcClientFactory.getInstance(hostname, port, BATCH_SIZE);
     try {
-      transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port));
-      AvroSourceProtocol client = SpecificRequestor.getClient(
-          AvroSourceProtocol.class, transceiver);
-      List<AvroFlumeEvent> eventBuffer = new ArrayList<AvroFlumeEvent>();
+      List<Event> eventBuffer = Lists.newArrayList();
 
       if (fileName != null) {
         reader = new BufferedReader(new FileReader(new File(fileName)));
@@ -125,32 +127,24 @@ public class AvroCLIClient {
         reader = new BufferedReader(new InputStreamReader(System.in));
       }
 
-      String line = null;
+      String line;
       long lastCheck = System.currentTimeMillis();
       long sentBytes = 0;
 
+      int batchSize = rpcClient.getBatchSize();
       while ((line = reader.readLine()) != null) {
         // logger.debug("read:{}", line);
 
-        if (eventBuffer.size() >= 1000) {
-          Status status = client.appendBatch(eventBuffer);
-
-          if (!status.equals(Status.OK)) {
-            logger.error("Unable to send batch size:{} status:{}",
-                eventBuffer.size(), status);
-          }
-
+        int size = eventBuffer.size();
+        if (size == batchSize) {
+          rpcClient.appendBatch(eventBuffer);
           eventBuffer.clear();
         }
 
-        AvroFlumeEvent avroEvent = new AvroFlumeEvent();
-
-        avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>());
-        avroEvent.setBody(ByteBuffer.wrap(line.getBytes()));
-
-        eventBuffer.add(avroEvent);
+        Event event = EventBuilder.withBody(line, Charset.forName("UTF8"));
+        eventBuffer.add(event);
 
-        sentBytes += avroEvent.getBody().capacity();
+        sentBytes += event.getBody().length;
         sent++;
 
         long now = System.currentTimeMillis();
@@ -161,15 +155,8 @@ public class AvroCLIClient {
         }
       }
 
-      if (eventBuffer.size() > 0) {
-        Status status = client.appendBatch(eventBuffer);
-
-        if (!status.equals(Status.OK)) {
-          logger.error("Unable to send batch size:{} status:{}",
-              eventBuffer.size(), status);
-        }
-
-        eventBuffer.clear();
+      if (!eventBuffer.isEmpty()) {
+        rpcClient.appendBatch(eventBuffer);
       }
 
       logger.debug("Finished");
@@ -179,10 +166,8 @@ public class AvroCLIClient {
         reader.close();
       }
 
-      if (transceiver != null) {
-        logger.debug("Closing tranceiver");
-        transceiver.close();
-      }
+      logger.debug("Closing RPC client");
+      rpcClient.close();
     }
   }
 }

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java Tue Mar 13 02:25:30 2012
@@ -20,48 +20,41 @@
 package org.apache.flume.sink;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyTransceiver;
-import org.apache.avro.ipc.Transceiver;
-import org.apache.avro.ipc.specific.SpecificRequestor;
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
 import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
 import org.apache.flume.conf.Configurable;
-import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.apache.flume.source.AvroSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
 
 /**
  * <p>
- * A {@link Sink} implementation that can send events to an Avro server (such as
- * Flume's <tt>AvroSource</tt>).
+ * A {@link Sink} implementation that can send events to an RPC server (such as
+ * Flume's {@link AvroSource}).
  * </p>
  * <p>
  * This sink forms one half of Flume's tiered collection support. Events sent to
- * this sink are turned into {@link AvroFlumeEvent}s and sent to the configured
- * hostname / port pair using Avro's {@link NettyTransceiver}. The intent is
- * that the destination is an instance of Flume's <tt>AvroSource</tt> which
- * allows Flume to nodes to forward to other Flume nodes forming a tiered
+ * this sink are transported over the network to the hostname / port pair using
+ * the RPC implementation encapsulated in {@link RpcClient}.
+ * The destination is an instance of Flume's {@link AvroSource}, which
+ * allows Flume agents to forward to other Flume agents, forming a tiered
  * collection infrastructure. Of course, nothing prevents one from using this
  * sink to speak to other custom built infrastructure that implements the same
- * Avro protocol (specifically {@link AvroSourceProtocol}).
+ * RPC protocol.
  * </p>
  * <p>
  * Events are taken from the configured {@link Channel} in batches of the
@@ -120,8 +113,7 @@ public class AvroSink extends AbstractSi
   private Integer port;
   private Integer batchSize;
 
-  private AvroSourceProtocol client;
-  private Transceiver transceiver;
+  private RpcClient client;
   private CounterGroup counterGroup;
 
   public AvroSink() {
@@ -132,8 +124,8 @@ public class AvroSink extends AbstractSi
   public void configure(Context context) {
     hostname = context.getString("hostname");
     port = context.getInteger("port");
-    batchSize = context.getInteger("batch-size");
 
+    batchSize = context.getInteger("batch-size");
     if (batchSize == null) {
       batchSize = defaultBatchSize;
     }
@@ -142,47 +134,63 @@ public class AvroSink extends AbstractSi
     Preconditions.checkState(port != null, "No port specified");
   }
 
-  private void createConnection() throws IOException {
-    if (transceiver == null) {
-      logger.debug("Creating new tranceiver connection to hostname:{} port:{}",
-          hostname, port);
-      transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port));
-    }
+  /**
+   * If this function is called successively without calling
+   * {@see #destroyConnection()}, only the first call has any effect.
+   * @throws FlumeException if an RPC client connection could not be opened
+   */
+  private void createConnection() throws FlumeException {
 
     if (client == null) {
-      logger.debug("Creating Avro client with tranceiver:{}", transceiver);
-      client = SpecificRequestor.getClient(AvroSourceProtocol.class,
-          transceiver);
+      logger.debug(
+          "Building RpcClient with hostname:{}, port:{}, batchSize:{}",
+          new Object[] { hostname, port, batchSize });
+
+       client = RpcClientFactory.getInstance(hostname, port, batchSize);
     }
+
   }
 
   private void destroyConnection() {
-    if (transceiver != null) {
-      logger.debug("Destroying tranceiver:{}", transceiver);
+    if (client != null) {
+      logger.debug("Closing avro client:{}", client);
       try {
-        transceiver.close();
-      } catch (IOException e) {
-        logger
-            .error(
-                "Attempt to clean up avro tranceiver after client error failed. Exception follows.",
-                e);
+        client.close();
+      } catch (FlumeException e) {
+        logger.error("Attempt to close avro client failed. Exception follows.",
+            e);
       }
-
-      transceiver = null;
     }
 
     client = null;
   }
 
+  /**
+   * Ensure the connection exists and is active.
+   * If the connection is not active, destroy it and recreate it.
+   *
+   * @throws FlumeException If there are errors closing or opening the RPC
+   * connection.
+   */
+  private void verifyConnection() throws FlumeException {
+    if (client == null) {
+      createConnection();
+    } else if (!client.isActive()) {
+      destroyConnection();
+      createConnection();
+    }
+  }
+
   @Override
   public void start() {
     logger.info("Avro sink starting");
 
     try {
       createConnection();
-    } catch (Exception e) {
+    } catch (FlumeException e) {
       logger.error("Unable to create avro client using hostname:" + hostname
-          + " port:" + port + ". Exception follows.", e);
+          + ", port:" + port + ", batchSize: " + batchSize +
+          ". Exception follows.", e);
 
       /* Try to prevent leaking resources. */
       destroyConnection();
@@ -215,9 +223,10 @@ public class AvroSink extends AbstractSi
 
     try {
       transaction.begin();
-      createConnection();
 
-      List<AvroFlumeEvent> batch = new LinkedList<AvroFlumeEvent>();
+      verifyConnection();
+
+      List<Event> batch = Lists.newLinkedList();
 
       for (int i = 0; i < batchSize; i++) {
         Event event = channel.take();
@@ -227,44 +236,35 @@ public class AvroSink extends AbstractSi
           break;
         }
 
-        AvroFlumeEvent avroEvent = new AvroFlumeEvent();
-
-        avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
-        Map<CharSequence, CharSequence> headers = Maps.newHashMap();
-
-        for (Entry<String, String> entry : event.getHeaders().entrySet()) {
-          headers.put(entry.getKey(), entry.getValue());
-        }
-        avroEvent.setHeaders(headers);
-        batch.add(avroEvent);
+        batch.add(event);
       }
 
       if (batch.isEmpty()) {
         counterGroup.incrementAndGet("batch.empty");
         status = Status.BACKOFF;
       } else {
-        if (!client.appendBatch(batch).equals(
-            org.apache.flume.source.avro.Status.OK)) {
-          throw new AvroRemoteException("RPC communication returned FAILED");
-        }
+        client.appendBatch(batch);
       }
 
       transaction.commit();
       counterGroup.incrementAndGet("batch.success");
+
     } catch (ChannelException e) {
       transaction.rollback();
       logger.error("Unable to get event from channel. Exception follows.", e);
       status = Status.BACKOFF;
-    } catch (AvroRemoteException e) {
+
+    } catch (EventDeliveryException e) {
       transaction.rollback();
-      logger.error("Unable to send event batch. Exception follows.", e);
-      status = Status.BACKOFF;
-    } catch (Exception e) {
+      destroyConnection();
+      throw e;
+
+    } catch (FlumeException e) {
       transaction.rollback();
-      logger.error(
-          "Unable to communicate with Avro server. Exception follows.", e);
-      status = Status.BACKOFF;
       destroyConnection();
+      throw new EventDeliveryException("RPC connection error. " +
+          "Exception follows.", e);
+
     } finally {
       transaction.close();
     }

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java Tue Mar 13 02:25:30 2012
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.avro.AvroRemoteException;
 import org.apache.avro.ipc.NettyServer;
 import org.apache.avro.ipc.Responder;
 import org.apache.avro.ipc.Server;
@@ -156,20 +155,13 @@ public class AvroSource extends Abstract
   }
 
   @Override
-  public Status append(AvroFlumeEvent avroEvent) throws AvroRemoteException {
+  public Status append(AvroFlumeEvent avroEvent) {
     logger.debug("Received avro event:{}", avroEvent);
 
     counterGroup.incrementAndGet("rpc.received");
 
-    Map<String, String> headers = new HashMap<String, String>();
-
-    for (Entry<CharSequence, CharSequence> entry : avroEvent.getHeaders()
-        .entrySet()) {
-
-      headers.put(entry.getKey().toString(), entry.getValue().toString());
-    }
-
-    Event event = EventBuilder.withBody(avroEvent.getBody().array(), headers);
+    Event event = EventBuilder.withBody(avroEvent.getBody().array(),
+        avroEvent.getHeaders());
 
     try {
       getChannelProcessor().processEvent(event);
@@ -189,15 +181,8 @@ public class AvroSource extends Abstract
     List<Event> batch = new ArrayList<Event>();
 
     for (AvroFlumeEvent avroEvent : events) {
-      Map<String, String> headers = new HashMap<String, String>();
-
-      for (Entry<CharSequence, CharSequence> entry : avroEvent.getHeaders()
-          .entrySet()) {
-
-        headers.put(entry.getKey().toString(), entry.getValue().toString());
-      }
-
-      Event event = EventBuilder.withBody(avroEvent.getBody().array(), headers);
+      Event event = EventBuilder.withBody(avroEvent.getBody().array(),
+          avroEvent.getHeaders());
       counterGroup.incrementAndGet("rpc.events");
 
       batch.add(event);

Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java Tue Mar 13 02:25:30 2012
@@ -20,6 +20,7 @@
 package org.apache.flume.sink;
 
 import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.List;
 
@@ -92,7 +93,9 @@ public class TestAvroSink {
   }
 
   @Test
-  public void testProcess() throws InterruptedException, EventDeliveryException {
+  public void testProcess() throws InterruptedException,
+      EventDeliveryException {
+
     Event event = EventBuilder.withBody("test event 1".getBytes(),
         new HashMap<String, String>());
     Server server = createServer();
@@ -130,8 +133,8 @@ public class TestAvroSink {
   public void testFailedConnect() throws InterruptedException,
       EventDeliveryException {
 
-    Event event = EventBuilder.withBody("test event 1".getBytes(),
-        new HashMap<String, String>());
+    Event event = EventBuilder.withBody("test event 1",
+        Charset.forName("UTF8"));
     Server server = createServer();
 
     server.start();
@@ -153,8 +156,14 @@ public class TestAvroSink {
     transaction.close();
 
     for (int i = 0; i < 5; i++) {
-      Sink.Status status = sink.process();
-      Assert.assertEquals(Sink.Status.BACKOFF, status);
+      boolean threwException = false;
+      try {
+        sink.process();
+      } catch (EventDeliveryException e) {
+        threwException = true;
+      }
+      Assert.assertTrue("Must throw EventDeliveryException if disconnected",
+          threwException);
     }
 
     server = createServer();

Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java Tue Mar 13 02:25:30 2012
@@ -145,7 +145,7 @@ public class TestAvroSource {
 
     AvroFlumeEvent avroEvent = new AvroFlumeEvent();
 
-    avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>());
+    avroEvent.setHeaders(new HashMap<String, String>());
     avroEvent.setBody(ByteBuffer.wrap("Hello avro".getBytes()));
 
     Status status = client.append(avroEvent);

Modified: incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/pom.xml Tue Mar 13 02:25:30 2012
@@ -59,6 +59,11 @@ limitations under the License.
 
     <dependency>
       <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-core</artifactId>
     </dependency>
 

Modified: incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java (original)
+++ incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java Tue Mar 13 02:25:30 2012
@@ -38,14 +38,15 @@ import org.apache.flume.source.AbstractS
 
 import com.cloudera.flume.handlers.avro.AvroFlumeOGEvent;
 import com.cloudera.flume.handlers.avro.FlumeOGEventAvroServer;
+import org.apache.avro.AvroRemoteException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flume.ChannelException;
 
 /**
  * <p>
- * A {@link Source} implementation that receives Avro events from Avro sink of Flume OG
- * </p>
+ * A {@link Source} implementation that receives Avro events from Avro sink of
+ * Flume OG</p>
  * <p>
  * <b>Configuration options</b>
  * </p>
@@ -77,8 +78,8 @@ import org.apache.flume.ChannelException
  * </p>
  */
 
-public class AvroLegacySource extends AbstractSource implements EventDrivenSource,
-    Configurable, FlumeOGEventAvroServer {
+public class AvroLegacySource extends AbstractSource implements
+    EventDrivenSource, Configurable, FlumeOGEventAvroServer {
 
   static final Logger LOG = LoggerFactory.getLogger(AvroLegacySource.class);
 
@@ -121,7 +122,7 @@ public class AvroLegacySource extends Ab
   }
 
   @Override
-  public Void append( AvroFlumeOGEvent evt ) throws org.apache.avro.AvroRemoteException {
+  public Void append( AvroFlumeOGEvent evt ) throws AvroRemoteException {
     counterGroup.incrementAndGet("rpc.received");
     Map<String, String> headers = new HashMap<String, String>();
 
@@ -130,7 +131,7 @@ public class AvroLegacySource extends Ab
     headers.put(TIMESTAMP, evt.getTimestamp().toString());
     headers.put(PRIORITY, evt.getPriority().toString());
     headers.put(NANOS, evt.getNanos().toString());
-    for (Entry<CharSequence, ByteBuffer> entry: evt.getFields().entrySet()) {
+    for (Entry<String, ByteBuffer> entry: evt.getFields().entrySet()) {
       headers.put(entry.getKey().toString(), entry.getValue().toString());
     }
     headers.put(OG_EVENT, "yes");

Modified: incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java (original)
+++ incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java Tue Mar 13 02:25:30 2012
@@ -145,7 +145,7 @@ public class TestLegacyAvroSource {
 
     AvroFlumeOGEvent avroEvent =  AvroFlumeOGEvent.newBuilder().setHost("foo").
         setPriority(Priority.INFO).setNanos(0).setTimestamp(1).
-        setFields(new HashMap<CharSequence, ByteBuffer> ()).
+        setFields(new HashMap<String, ByteBuffer> ()).
         setBody(ByteBuffer.wrap("foo".getBytes())).build();
 
     client.append(avroEvent);

Modified: incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/pom.xml Tue Mar 13 02:25:30 2012
@@ -103,6 +103,11 @@ limitations under the License.
 
     <dependency>
       <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-core</artifactId>
     </dependency>
 

Modified: incubator/flume/trunk/flume-ng-node/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-node/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-node/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-node/pom.xml Tue Mar 13 02:25:30 2012
@@ -59,6 +59,11 @@
 
     <dependency>
       <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-core</artifactId>
     </dependency>
 

Propchange: incubator/flume/trunk/flume-ng-sdk/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Mar 13 02:25:30 2012
@@ -0,0 +1,4 @@
+.classpath
+.project
+.settings
+target

Added: incubator/flume/trunk/flume-ng-sdk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/pom.xml?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/pom.xml (added)
+++ incubator/flume/trunk/flume-ng-sdk/pom.xml Tue Mar 13 02:25:30 2012
@@ -0,0 +1,90 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-parent</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.1.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>flume-ng-sdk</artifactId>
+  <name>Flume NG SDK</name>
+  <description>Flume Software Development Kit: Stable public API for integration with Flume 1.x</description>
+
+  <build>
+    <plugins>
+
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <configuration>
+          <stringType>String</stringType>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>idl-protocol</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>com.thoughtworks.paranamer</groupId>
+        <artifactId>paranamer-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>run</id>
+            <configuration>
+              <sourceDirectory>${project.build.directory}/generated-sources/avro</sourceDirectory>
+              <outputDirectory>${project.build.directory}/classes</outputDirectory>
+            </configuration>
+            <goals>
+              <goal>generate</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+  </build>
+
+
+  <dependencies>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-ipc</artifactId>
+    </dependency>
+
+  </dependencies>
+</project>

Added: incubator/flume/trunk/flume-ng-sdk/src/main/avro/flume.avdl
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/avro/flume.avdl?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/avro/flume.avdl (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/avro/flume.avdl Tue Mar 13 02:25:30 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+@namespace("org.apache.flume.source.avro")
+
+protocol AvroSourceProtocol {
+
+  enum Status {
+    OK, FAILED, UNKNOWN
+  }
+
+  record AvroFlumeEvent {
+    map<string> headers;
+    bytes body;
+  }
+
+  Status append( AvroFlumeEvent event );
+
+  Status appendBatch( array<AvroFlumeEvent> events );
+
+}

Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/Event.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/Event.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/Event.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/Event.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume;
+
+import java.util.Map;
+
+/**
+ * Basic representation of a data object in Flume.
+ * Provides access to data as it flows through the system.
+ */
+public interface Event {
+
+  /**
+   * Returns a map of name-value pairs describing the data stored in the body.
+   */
+  public Map<String, String> getHeaders();
+
+  /**
+   * Set the event headers
+   * @param headers Map of headers to replace the current headers.
+   */
+  public void setHeaders(Map<String, String> headers);
+
+  /**
+   * Returns the raw byte array of the data contained in this event.
+   */
+  public byte[] getBody();
+
+  /**
+   * Sets the raw byte array of the data contained in this event.
+   * @param body The data.
+   */
+  public void setBody(byte[] body);
+
+}

Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/Event.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/EventDeliveryException.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/EventDeliveryException.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/EventDeliveryException.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/EventDeliveryException.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume;
+
+/**
+ * An event delivery exception is raised whenever an {@link Event} fails to
+ * reach at least one of its intended (next-hop) destinations.
+ */
+public class EventDeliveryException extends Exception {
+
+  private static final long serialVersionUID = 1102327497549834945L;
+
+  public EventDeliveryException() {
+    super();
+  }
+
+  public EventDeliveryException(String message) {
+    super(message);
+  }
+
+  public EventDeliveryException(String message, Throwable t) {
+    super(message, t);
+  }
+
+  public EventDeliveryException(Throwable t) {
+    super(t);
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/EventDeliveryException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/FlumeException.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/FlumeException.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/FlumeException.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/FlumeException.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume;
+
+/**
+ * Base class of all flume exceptions.
+ */
+public class FlumeException extends RuntimeException {
+
+  private static final long serialVersionUID = 1L;
+
+  public FlumeException(String msg) {
+    super(msg);
+  }
+
+  public FlumeException(String msg, Throwable th) {
+    super(msg, th);
+  }
+
+  public FlumeException(Throwable th) {
+    super(th);
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/FlumeException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,382 @@
+/*
+ * Copyright 2012 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.api;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.avro.ipc.CallFuture;
+
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.apache.flume.source.avro.Status;
+
+/**
+ * Avro/Netty implementation of {@link RpcClient}.
+ * The connections are intended to be opened before clients are given access so
+ * that the object cannot ever be in an inconsistent when exposed to users.
+ */
+public class NettyAvroRpcClient implements RpcClient {
+
+  private final ReentrantLock stateLock = new ReentrantLock();
+
+  private final static long DEFAULT_CONNECT_TIMEOUT_MILLIS =
+      TimeUnit.MILLISECONDS.convert(60, TimeUnit.SECONDS);
+
+  private final static long DEFAULT_REQUEST_TIMEOUT_MILLIS =
+      TimeUnit.MILLISECONDS.convert(60, TimeUnit.SECONDS);
+
+  /**
+   * Guarded by {@code stateLock}
+   */
+  private ConnState connState;
+
+  private final String hostname;
+  private final Integer port;
+  private final Integer batchSize;
+
+  private Transceiver transceiver;
+  private AvroSourceProtocol.Callback avroClient;
+
+  /**
+   * This constructor is intended to be called from {@link AvroClientBuilder}.
+   * @param hostname The destination hostname
+   * @param port The destination port
+   * @param batchSize Maximum number of Events to accept in appendBatch()
+   */
+  private NettyAvroRpcClient(String hostname, Integer port, Integer batchSize) {
+
+    if (hostname == null) throw new NullPointerException("hostname is null");
+    if (port == null) throw new NullPointerException("port is null");
+    if (batchSize == null) throw new NullPointerException("batchSize is null");
+
+    this.hostname = hostname;
+    this.port = port;
+    this.batchSize = batchSize;
+
+    setState(ConnState.INIT);
+  }
+
+  /**
+   * This method should only be invoked by the Builder
+   * @throws FlumeException
+   */
+  private void connect() throws FlumeException {
+    connect(DEFAULT_CONNECT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Internal only, for now
+   * @param timeout
+   * @param tu
+   * @throws FlumeException
+   */
+  private void connect(long timeout, TimeUnit tu) throws FlumeException {
+    try {
+      transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port),
+          tu.toMillis(timeout));
+      avroClient =
+          SpecificRequestor.getClient(AvroSourceProtocol.Callback.class,
+          transceiver);
+
+    } catch (IOException ex) {
+      throw new FlumeException("RPC connection error. Exception follows.", ex);
+    }
+
+    setState(ConnState.READY);
+  }
+
+  @Override
+  public void close() throws FlumeException {
+    try {
+      transceiver.close();
+    } catch (IOException ex) {
+      throw new FlumeException("Error closing transceiver. Exception follows.",
+          ex);
+    } finally {
+      setState(ConnState.DEAD);
+    }
+
+  }
+
+  @Override
+  public int getBatchSize() {
+    return batchSize;
+  }
+
+  @Override
+  public void append(Event event) throws EventDeliveryException {
+    try {
+      append(event, DEFAULT_REQUEST_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+    } catch (EventDeliveryException e) {
+      // we mark as no longer active without trying to clean up resources
+      // client is required to call close() to clean up resources
+      setState(ConnState.DEAD);
+      throw e;
+    }
+  }
+
+  private void append(Event event, long timeout, TimeUnit tu)
+      throws EventDeliveryException {
+
+    assertReady();
+
+    CallFuture<Status> callFuture = new CallFuture<Status>();
+
+    try {
+      AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+      avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
+      avroEvent.setHeaders(event.getHeaders());
+      avroClient.append(avroEvent, callFuture);
+    } catch (IOException ex) {
+      throw new EventDeliveryException("RPC request IO exception. " +
+          "Exception follows.", ex);
+    }
+
+    waitForStatusOK(callFuture, timeout, tu);
+  }
+
+  @Override
+  public void appendBatch(List<Event> events) throws EventDeliveryException {
+    try {
+      appendBatch(events, DEFAULT_REQUEST_TIMEOUT_MILLIS,
+          TimeUnit.MILLISECONDS);
+    } catch (EventDeliveryException e) {
+      // we mark as no longer active without trying to clean up resources
+      // client is required to call close() to clean up resources
+      setState(ConnState.DEAD);
+      throw e;
+    }
+  }
+
+  private void appendBatch(List<Event> events, long timeout, TimeUnit tu)
+      throws EventDeliveryException {
+
+    assertReady();
+
+    Iterator<Event> iter = events.iterator();
+    List<AvroFlumeEvent> avroEvents = new LinkedList<AvroFlumeEvent>();
+
+    // send multiple batches... bail if there is a problem at any time
+    while (iter.hasNext()) {
+      avroEvents.clear();
+
+      for (int i = 0; i < batchSize && iter.hasNext(); i++) {
+        Event event = iter.next();
+        AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+        avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
+        avroEvent.setHeaders(event.getHeaders());
+        avroEvents.add(avroEvent);
+      }
+
+      CallFuture<Status> callFuture = new CallFuture<Status>();
+      try {
+        avroClient.appendBatch(avroEvents, callFuture);
+      } catch (IOException ex) {
+        throw new EventDeliveryException("RPC request IO exception. " +
+            "Exception follows.", ex);
+      }
+
+      waitForStatusOK(callFuture, timeout, tu);
+    }
+  }
+
+  /**
+   * Helper method that waits for a Status future to come back and validates
+   * that it returns Status == OK.
+   * @param callFuture Future to wait on
+   * @param timeout Time to wait before failing
+   * @param tu Time Unit of {@code timeout}
+   * @throws EventDeliveryException If there is a timeout or if Status != OK
+   */
+  private static void waitForStatusOK(CallFuture<Status> callFuture,
+      long timeout, TimeUnit tu) throws EventDeliveryException {
+    try {
+      Status status = callFuture.get(timeout, tu);
+      if (status != Status.OK) {
+        throw new EventDeliveryException("Status (" + status + ") is not OK");
+      }
+    } catch (CancellationException ex) {
+      throw new EventDeliveryException("RPC future was cancelled." +
+          " Exception follows.", ex);
+    } catch (ExecutionException ex) {
+      throw new EventDeliveryException("Exception thrown from remote handler." +
+          " Exception follows.", ex);
+    } catch (TimeoutException ex) {
+      throw new EventDeliveryException("RPC request timed out." +
+          " Exception follows.", ex);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new EventDeliveryException("RPC request interrupted." +
+          " Exception follows.", ex);
+    }
+  }
+
+  /**
+   * This method should always be used to change {@code connState} so we ensure
+   * that invalid state transitions do not occur and that the {@code isIdle}
+   * {@link Condition} variable gets signaled reliably.
+   * Throws {@code IllegalStateException} when called to transition from CLOSED
+   * to another state.
+   * @param state
+   */
+  private void setState(ConnState newState) {
+    stateLock.lock();
+    try {
+      if (connState == ConnState.DEAD && connState != newState) {
+        throw new IllegalStateException("Cannot transition from CLOSED state.");
+      }
+      connState = newState;
+    } finally {
+      stateLock.unlock();
+    }
+  }
+
+  /**
+   * If the connection state != READY, throws {@link EventDeliveryException}.
+   */
+  private void assertReady() throws EventDeliveryException {
+    stateLock.lock();
+    try {
+      ConnState curState = connState;
+      if (curState != ConnState.READY) {
+        throw new EventDeliveryException("RPC failed, client in an invalid " +
+            "state: " + curState);
+      }
+    } finally {
+      stateLock.unlock();
+    }
+  }
+
+  @Override
+  public boolean isActive() {
+    stateLock.lock();
+    try {
+      return (connState == ConnState.READY);
+    } finally {
+      stateLock.unlock();
+    }
+  }
+
+  private static enum ConnState {
+    INIT, READY, DEAD
+  }
+
+  /**
+   * <p>Builder class used to construct {@link NettyAvroRpcClient} objects.</p>
+   *
+   * <p><strong>Note:</strong> It is recommended for applications to construct
+   * {@link RpcClient} instances using the {@link RpcClientFactory} class.</p>
+   */
+  protected static class Builder {
+
+    protected static final int DEFAULT_BATCH_SIZE = 100;
+
+    private String hostname;
+    private Integer port;
+    private Integer batchSize;
+
+    public Builder() {
+      batchSize = DEFAULT_BATCH_SIZE;
+    }
+
+    /**
+     * Hostname to connect to (required)
+     *
+     * @param hostname
+     * @return {@code this}
+     */
+    public Builder hostname(String hostname) {
+      if (hostname == null) {
+        throw new NullPointerException("hostname is null");
+      }
+
+      this.hostname = hostname;
+      return this;
+    }
+
+    /**
+     * Port to connect to (required)
+     *
+     * @param port
+     * @return {@code this}
+     */
+    public Builder port(Integer port) {
+      if (port == null) {
+        throw new NullPointerException("port is null");
+      }
+
+      this.port = port;
+      return this;
+    }
+
+    /**
+     * Maximum number of {@linkplain Event events} that can be processed in a
+     * batch operation. (optional)<br>
+     * Default: 100
+     *
+     * @param batchSize
+     * @return {@code this}
+     */
+    public Builder batchSize(Integer batchSize) {
+      if (batchSize == null) {
+        throw new NullPointerException("batch size is null");
+      }
+
+      this.batchSize = batchSize;
+      return this;
+    }
+
+    /**
+     * Construct the object
+     * @return Active RPC client
+     * @throws FlumeException
+     */
+    public NettyAvroRpcClient build() throws FlumeException {
+      // validate the required fields
+      if (hostname == null) throw new NullPointerException("hostname is null");
+      if (port == null) throw new NullPointerException("port is null");
+      if (batchSize == null) {
+        throw new NullPointerException("batch size is null");
+      }
+
+      NettyAvroRpcClient client =
+          new NettyAvroRpcClient(hostname, port, batchSize);
+      client.connect();
+
+      return client;
+    }
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClient.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClient.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClient.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2012 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.api;
+
+import java.util.List;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+
+/**
+ * <p>Public client interface for sending data to Flume.</p>
+ *
+ * <p>This interface is intended not to change incompatibly for Flume 1.x.</p>
+ *
+ * <p><strong>Note:</strong> It is recommended for applications to construct
+ * {@link RpcClient} instances using the {@link RpcClientFactory} class,
+ * instead of using builders associated with a particular implementation class.
+ * </p>
+ *
+ * @see org.apache.flume.api.RpcClientFactory
+ */
+public interface RpcClient {
+
+  /**
+   * Returns the maximum number of {@link Event events} that may be batched
+   * at once by {@link #appendBatch(List) appendBatch()}.
+   */
+  public int getBatchSize();
+
+  /**
+   * <p>Send a single {@link Event} to the associated Flume source.</p>
+   *
+   * <p>This method blocks until the RPC returns or until the request times out.
+   * </p>
+   *
+   * <p><strong>Note:</strong> If this method throws an
+   * {@link EventDeliveryException}, there is no way to recover and the
+   * application must invoke {@link #close()} on this object to clean up system
+   * resources.</p>
+   *
+   * @param event
+   * @return
+   * @throws EventDeliveryException when an error prevents event delivery.
+   */
+  public void append(Event event) throws EventDeliveryException;
+
+  /**
+   * <p>Send a list of {@linkplain Event events} to the associated Flume source.
+   * </p>
+   *
+   * <p>This method blocks until the RPC returns or until the request times out.
+   * </p>
+   *
+   * <p>It is strongly recommended that the number of events in the List be no
+   * more than {@link #getBatchSize()}. If it is more, multiple RPC calls will
+   * be required, and the likelihood of duplicate Events being stored will
+   * increase.</p>
+   *
+   * <p><strong>Note:</strong> If this method throws an
+   * {@link EventDeliveryException}, there is no way to recover and the
+   * application must invoke {@link #close()} on this object to clean up system
+   * resources.</p>
+   *
+   * @param events List of events to send
+   * @return
+   * @throws EventDeliveryException when an error prevents event delivery.
+   */
+  public void appendBatch(List<Event> events) throws
+      EventDeliveryException;
+
+  /**
+   * <p>Returns {@code true} if this object appears to be in a usable state, and
+   * it returns {@code false} if this object is permanently disabled.</p>
+   *
+   * <p>If this method returns {@code false}, an application must call
+   * {@link #close()} on this object to clean up system resources.</p>
+   */
+  public boolean isActive();
+
+  /**
+   * <p>Immediately closes the client so that it may no longer be used.</p>
+   *
+   * <p><strong>Note:</strong> This method MUST be called by applications
+   * when they are done using the RPC client in order to clean up resources.</p>
+   *
+   * <p>Multi-threaded applications may want to gracefully stop making
+   * RPC calls before calling close(). Otherwise, they risk getting
+   * {@link EventDeliveryException} thrown from their in-flight calls when the
+   * underlying connection is disabled.</p>
+   */
+  public void close() throws FlumeException;
+
+}

Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2012 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.api;
+
+import org.apache.flume.FlumeException;
+
+/**
+ * Factory class to construct Flume {@link RPCClient} implementations.
+ */
+public class RpcClientFactory {
+
+  /**
+   * Returns an instance of {@link RpcClient} connected to the specified
+   * {@code hostname} and {@code port}.
+   * @throws FlumeException
+   */
+  public static RpcClient getInstance(String hostname, Integer port)
+      throws FlumeException {
+
+    return new NettyAvroRpcClient.Builder()
+        .hostname(hostname).port(port).build();
+  }
+
+  /**
+   * Returns an instance of {@link RpcClient} connected to the specified
+   * {@code hostname} and {@code port} with the specified {@code batchSize}.
+   * @throws FlumeException
+   */
+  public static RpcClient getInstance(String hostname, Integer port,
+      Integer batchSize) throws FlumeException {
+
+    return new NettyAvroRpcClient.Builder()
+        .hostname(hostname).port(port).batchSize(batchSize).build();
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.event;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.Event;
+
+public class EventBuilder {
+
+  /**
+   * Instantiate an Event instance based on the provided body and headers.
+   * If <code>headers</code> is <code>null</code>, then it is ignored.
+   * @param body
+   * @param headers
+   * @return
+   */
+  public static Event withBody(byte[] body, Map<String, String> headers) {
+    Event event = new SimpleEvent();
+
+    event.setBody(body);
+
+    if (headers != null) {
+      event.setHeaders(new HashMap<String, String>(headers));
+    }
+
+    return event;
+  }
+
+  public static Event withBody(byte[] body) {
+    return withBody(body, null);
+  }
+
+  public static Event withBody(String body, Charset charset,
+      Map<String, String> headers) {
+
+    return withBody(body.getBytes(charset), headers);
+  }
+
+  public static Event withBody(String body, Charset charset) {
+    return withBody(body, charset, null);
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.event;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.Event;
+
+public class SimpleEvent implements Event {
+
+  private Map<String, String> headers;
+  private byte[] body;
+
+  public SimpleEvent() {
+    headers = new HashMap<String, String>();
+    body = null;
+  }
+
+  @Override
+  public Map<String, String> getHeaders() {
+    return headers;
+  }
+
+  @Override
+  public void setHeaders(Map<String, String> headers) {
+    this.headers = headers;
+  }
+
+  @Override
+  public byte[] getBody() {
+    return body;
+  }
+
+  @Override
+  public void setBody(byte[] body) {
+    this.body = body;
+  }
+
+  @Override
+  public String toString() {
+    return "{ headers:" + headers + " body:" + body + " }";
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,234 @@
+/*
+ * Copyright 2012 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.api;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import junit.framework.Assert;
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.apache.flume.source.avro.Status;
+
+/**
+ * Helpers for Netty Avro RPC testing
+ */
+public class RpcTestUtils {
+
+  private static final Logger logger =
+      Logger.getLogger(TestNettyAvroRpcClient.class.getName());
+
+  private static final String localhost = "localhost";
+
+
+  /**
+   * Helper method for testing simple (single) appends on handlers
+   * @param handler
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  public static void handlerSimpleAppendTest(AvroSourceProtocol handler)
+      throws FlumeException, EventDeliveryException {
+    NettyAvroRpcClient client = null;
+    Server server = startServer(handler);
+    try {
+      client = getStockLocalClient(server.getPort());
+      boolean isActive = client.isActive();
+      Assert.assertTrue("Client should be active", isActive);
+      client.append(EventBuilder.withBody("wheee!!!", Charset.forName("UTF8")));
+    } finally {
+      stopServer(server);
+      if (client != null) client.close();
+    }
+  }
+
+  /**
+   * Helper method for testing batch appends on handlers
+   * @param handler
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  public static void handlerBatchAppendTest(AvroSourceProtocol handler)
+      throws FlumeException, EventDeliveryException {
+    NettyAvroRpcClient client = null;
+    Server server = startServer(handler);
+    try {
+      client = getStockLocalClient(server.getPort());
+      boolean isActive = client.isActive();
+      Assert.assertTrue("Client should be active", isActive);
+
+      int batchSize = client.getBatchSize();
+      List<Event> events = new ArrayList<Event>();
+      for (int i = 0; i < batchSize; i++) {
+        events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
+      }
+      client.appendBatch(events);
+
+    } finally {
+      stopServer(server);
+      if (client != null) client.close();
+    }
+  }
+
+  /**
+   * Helper method for constructing a Netty RPC client that talks to localhost.
+   */
+  public static NettyAvroRpcClient getStockLocalClient(int port) {
+    NettyAvroRpcClient client = new NettyAvroRpcClient.Builder()
+        .hostname(localhost).port(port).build();
+
+    return client;
+  }
+
+  /**
+   * Start a NettyServer, wait a moment for it to spin up, and return it.
+   */
+  public static Server startServer(AvroSourceProtocol handler) {
+    Responder responder = new SpecificResponder(AvroSourceProtocol.class,
+        handler);
+    Server server = new NettyServer(responder,
+        new InetSocketAddress(localhost, 0));
+    server.start();
+    logger.log(Level.INFO, "Server started on hostname: {0}, port: {1}",
+        new Object[] { localhost, Integer.toString(server.getPort()) });
+
+    try {
+
+      Thread.sleep(300L);
+
+    } catch (InterruptedException ex) {
+      logger.log(Level.SEVERE, "Thread interrupted. Exception follows.", ex);
+      Thread.currentThread().interrupt();
+    }
+
+    return server;
+  }
+
+  /**
+   * Request that the specified Server stop, and attempt to wait for it to exit.
+   * @param server A running NettyServer
+   */
+  public static void stopServer(Server server) {
+    try {
+      server.close();
+      server.join();
+    } catch (InterruptedException ex) {
+      logger.log(Level.SEVERE, "Thread interrupted. Exception follows.", ex);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * A service that logs receipt of the request and returns OK
+   */
+  public static class OKAvroHandler implements AvroSourceProtocol {
+
+    @Override
+    public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+      logger.log(Level.INFO, "OK: Received event from append(): {0}",
+          new String(event.getBody().array(), Charset.forName("UTF8")));
+      return Status.OK;
+    }
+
+    @Override
+    public Status appendBatch(List<AvroFlumeEvent> events) throws
+        AvroRemoteException {
+      logger.log(Level.INFO, "OK: Received {0} events from appendBatch()",
+          events.size());
+      return Status.OK;
+    }
+
+  }
+
+  /**
+   * A service that logs receipt of the request and returns Failed
+   */
+  public static class FailedAvroHandler implements AvroSourceProtocol {
+
+    @Override
+    public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+      logger.log(Level.INFO, "Failed: Received event from append(): {0}",
+          new String(event.getBody().array(), Charset.forName("UTF8")));
+      return Status.FAILED;
+    }
+
+    @Override
+    public Status appendBatch(List<AvroFlumeEvent> events) throws
+        AvroRemoteException {
+      logger.log(Level.INFO, "Failed: Received {0} events from appendBatch()",
+          events.size());
+      return Status.FAILED;
+    }
+
+  }
+
+  /**
+   * A service that logs receipt of the request and returns Unknown
+   */
+  public static class UnknownAvroHandler implements AvroSourceProtocol {
+
+    @Override
+    public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+      logger.log(Level.INFO, "Unknown: Received event from append(): {0}",
+          new String(event.getBody().array(), Charset.forName("UTF8")));
+      return Status.UNKNOWN;
+    }
+
+    @Override
+    public Status appendBatch(List<AvroFlumeEvent> events) throws
+        AvroRemoteException {
+      logger.log(Level.INFO, "Unknown: Received {0} events from appendBatch()",
+          events.size());
+      return Status.UNKNOWN;
+    }
+
+  }
+
+  /**
+   * A service that logs receipt of the request and then throws an exception
+   */
+  public static class ThrowingAvroHandler implements AvroSourceProtocol {
+
+    @Override
+    public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+      logger.log(Level.INFO, "Throwing: Received event from append(): {0}",
+          new String(event.getBody().array(), Charset.forName("UTF8")));
+      throw new AvroRemoteException("Handler smash!");
+    }
+
+    @Override
+    public Status appendBatch(List<AvroFlumeEvent> events) throws
+        AvroRemoteException {
+      logger.log(Level.INFO, "Throwing: Received {0} events from appendBatch()",
+          events.size());
+      throw new AvroRemoteException("Handler smash!");
+    }
+
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native