You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/13 03:25:32 UTC
svn commit: r1299958 [1/2] - in /incubator/flume/trunk: ./
flume-ng-channels/flume-file-channel/ flume-ng-channels/flume-jdbc-channel/
flume-ng-clients/flume-ng-log4jappender/
flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clien...
Author: arvind
Date: Tue Mar 13 02:25:30 2012
New Revision: 1299958
URL: http://svn.apache.org/viewvc?rev=1299958&view=rev
Log:
FLUME-989. Factor Flume Avro RPC interfaces out in separate Client SDK.
(Mike Percy via Arvind Prabhakar)
Added:
incubator/flume/trunk/flume-ng-sdk/ (with props)
incubator/flume/trunk/flume-ng-sdk/pom.xml
incubator/flume/trunk/flume-ng-sdk/src/
incubator/flume/trunk/flume-ng-sdk/src/main/
incubator/flume/trunk/flume-ng-sdk/src/main/avro/
incubator/flume/trunk/flume-ng-sdk/src/main/avro/flume.avdl
incubator/flume/trunk/flume-ng-sdk/src/main/java/
incubator/flume/trunk/flume-ng-sdk/src/main/java/org/
incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/
incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/
incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/Event.java (with props)
incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/EventDeliveryException.java (with props)
incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/FlumeException.java (with props)
incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/
incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java (with props)
incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClient.java (with props)
incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java (with props)
incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/
incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java (with props)
incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java (with props)
incubator/flume/trunk/flume-ng-sdk/src/test/
incubator/flume/trunk/flume-ng-sdk/src/test/java/
incubator/flume/trunk/flume-ng-sdk/src/test/java/org/
incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/
incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/
incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/
incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java (with props)
incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java (with props)
incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestRpcClientFactory.java (with props)
incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/event/
incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/event/TestEventBuilder.java (with props)
Removed:
incubator/flume/trunk/flume-ng-core/src/main/avro/flume.avdl
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/Event.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/EventDeliveryException.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/FlumeException.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/event/EventBuilder.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/event/SimpleEvent.java
incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/util/TestEventBuilder.java
Modified:
incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml
incubator/flume/trunk/flume-ng-channels/flume-jdbc-channel/pom.xml
incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/pom.xml
incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
incubator/flume/trunk/flume-ng-core/pom.xml
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/pom.xml
incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java
incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/pom.xml
incubator/flume/trunk/flume-ng-node/pom.xml
incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/pom.xml
incubator/flume/trunk/flume-ng-sinks/flume-irc-sink/pom.xml
incubator/flume/trunk/pom.xml
Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/pom.xml Tue Mar 13 02:25:30 2012
@@ -44,6 +44,11 @@
<dependency>
<groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
</dependency>
Modified: incubator/flume/trunk/flume-ng-channels/flume-jdbc-channel/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-jdbc-channel/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-jdbc-channel/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-jdbc-channel/pom.xml Tue Mar 13 02:25:30 2012
@@ -49,6 +49,10 @@ limitations under the License.
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
</dependency>
<dependency>
Modified: incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/pom.xml Tue Mar 13 02:25:30 2012
@@ -39,25 +39,30 @@ limitations under the License.
</build>
<dependencies>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-ipc</artifactId>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
</dependency>
+ <!-- pull in flume core only for unit tests. TODO: not ideal -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
+ <scope>test</scope>
</dependency>
+
</dependencies>
Modified: incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java (original)
+++ incubator/flume/trunk/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java Tue Mar 13 02:25:30 2012
@@ -18,28 +18,25 @@
*/
package org.apache.flume.clients.log4jappender;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyTransceiver;
-import org.apache.avro.ipc.Transceiver;
-import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
-import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;
-import org.apache.flume.clients.log4jappender.Log4jAvroHeaders;
+
/**
*
* Appends Log4j Events to an external Flume client which is decribed by
- * the Log4j configuration file. The appender takes two parameters
+ * the Log4j configuration file. The appender takes two required parameters:
*<p>
*<strong>Hostname</strong> : This is the hostname of the first hop
*at which Flume (through an AvroSource) is listening for events.
@@ -65,17 +62,17 @@ public class Log4jAppender extends Appen
private String hostname;
private int port;
- private AvroSourceProtocol protocolClient = null;
- private Transceiver transceiver = null;
+ private RpcClient rpcClient = null;
/**
- * If this constructor is used programatically, rather than from a log4j conf,
+ * If this constructor is used programmatically rather than from a log4j conf
* you must set the <tt>port</tt> and <tt>hostname</tt> and then call
* <tt>activateOptions()</tt> before calling <tt>append()</tt>.
*/
public Log4jAppender(){
}
+
/**
* Sets the hostname and port. Even if these are passed the
* <tt>activateOptions()</tt> function must be called before calling
@@ -92,24 +89,27 @@ public class Log4jAppender extends Appen
/**
* Append the LoggingEvent, to send to the first Flume hop.
* @param event The LoggingEvent to be appended to the flume.
- * @throws FlumeException if the appender was closed
- * or the hostname and port were not setup.
+ * @throws FlumeException if the appender was closed,
+ * or the hostname and port were not setup, there was a timeout, or there
+ * was a connection error.
*/
@Override
public synchronized void append(LoggingEvent event) throws FlumeException{
- //If protocolClient is null, it means either this appender object was never
+ //If rpcClient is null, it means either this appender object was never
//setup by setting hostname and port and then calling activateOptions
//or this appender object was closed by calling close(), so we throw an
//exception to show the appender is no longer accessible.
- if(protocolClient == null){
+ if(rpcClient == null){
throw new FlumeException("Cannot Append to Appender!" +
"Appender either closed or not setup correctly!");
}
+ if(!rpcClient.isActive()){
+ reconnect();
+ }
+
//Client created first time append is called.
- AvroFlumeEvent avroEvent = new AvroFlumeEvent();
- Map<CharSequence, CharSequence> hdrs =
- new HashMap<CharSequence, CharSequence>();
+ Map<String, String> hdrs = new HashMap<String, String>();
hdrs.put(Log4jAvroHeaders.LOGGER_NAME.toString(), event.getLoggerName());
hdrs.put(Log4jAvroHeaders.TIMESTAMP.toString(),
String.valueOf(event.getTimeStamp()));
@@ -120,20 +120,16 @@ public class Log4jAppender extends Appen
hdrs.put(Log4jAvroHeaders.LOG_LEVEL.toString(),
String.valueOf(event.getLevel().toInt()));
hdrs.put(Log4jAvroHeaders.MESSAGE_ENCODING.toString(), "UTF8");
- avroEvent.setHeaders(hdrs);
+
+ Event flumeEvent = EventBuilder.withBody(event.getMessage().toString(),
+ Charset.forName("UTF8"), hdrs);
+
try {
- avroEvent.setBody(ByteBuffer.wrap(
- event.getMessage().toString().getBytes("UTF8")));
- protocolClient.append(avroEvent);
- } catch (UnsupportedEncodingException e) {
- String errormsg = "Unable to encode body of event! Event lost! " +
- e.getMessage();
- LogLog.error(errormsg);
- throw new FlumeException(errormsg,e);
- } catch (AvroRemoteException e) {
- String errormsg = "Avro Remote Exception: " + e.getMessage();
- LogLog.error(errormsg);
- throw new FlumeException(errormsg,e);
+ rpcClient.append(flumeEvent);
+ } catch (EventDeliveryException e) {
+ String msg = "Flume append() failed.";
+ LogLog.error(msg);
+ throw new FlumeException(msg + " Exception follows.", e);
}
}
@@ -144,26 +140,22 @@ public class Log4jAppender extends Appen
* Closes underlying client.
* If <tt>append()</tt> is called after this function is called,
* it will throw an exception.
- * @throws FlumeException if appender was closed once or not setup.
+ * @throws FlumeException if errors occur during close
*/
@Override
public synchronized void close() throws FlumeException{
- //Simply set protocol client to null and let Java do the cleanup
- //when the client is no longer accessible.
//Any append calls after this will result in an Exception.
- try {
- transceiver.close();
- } catch (IOException e) {
- throw new FlumeException("Attempting to close " +
- "Appender which is already closed or one which was never opened", e);
+ if (rpcClient != null) {
+ rpcClient.close();
+ rpcClient = null;
}
- protocolClient = null;
}
@Override
public boolean requiresLayout() {
return false;
}
+
/**
* Set the first flume hop hostname.
* @param hostname The first hop where the client should connect to.
@@ -171,6 +163,7 @@ public class Log4jAppender extends Appen
public void setHostname(String hostname){
this.hostname = hostname;
}
+
/**
* Set the port on the hostname to connect to.
* @param port The port to connect on the host.
@@ -188,17 +181,22 @@ public class Log4jAppender extends Appen
@Override
public void activateOptions() throws FlumeException{
try {
- transceiver = new NettyTransceiver(
- new InetSocketAddress(hostname, port));
- protocolClient = SpecificRequestor.getClient(
- AvroSourceProtocol.class, transceiver);
- } catch (IOException e) {
- String errormsg = "Avro Client creation failed! "+
+ rpcClient = RpcClientFactory.getInstance(hostname, port);
+ } catch (FlumeException e) {
+ String errormsg = "RPC client creation failed! " +
e.getMessage();
LogLog.error(errormsg);
- throw new FlumeException(errormsg,e);
-
+ throw e;
}
}
+
+ /**
+ * Make it easy to reconnect on failure
+ * @throws FlumeException
+ */
+ private void reconnect() throws FlumeException {
+ close();
+ activateOptions();
+ }
}
Modified: incubator/flume/trunk/flume-ng-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-core/pom.xml Tue Mar 13 02:25:30 2012
@@ -33,36 +33,6 @@ limitations under the License.
<plugins>
<plugin>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-maven-plugin</artifactId>
- <executions>
- <execution>
- <phase>generate-sources</phase>
- <goals>
- <goal>idl-protocol</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>com.thoughtworks.paranamer</groupId>
- <artifactId>paranamer-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>run</id>
- <configuration>
- <sourceDirectory>${project.build.directory}/generated-sources/avro</sourceDirectory>
- <outputDirectory>${project.build.directory}/classes</outputDirectory>
- </configuration>
- <goals>
- <goal>generate</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
@@ -73,6 +43,11 @@ limitations under the License.
<dependencies>
<dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java Tue Mar 13 02:25:30 2012
@@ -24,24 +24,22 @@ import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.nio.charset.Charset;
import java.util.List;
-import org.apache.avro.ipc.NettyTransceiver;
-import org.apache.avro.ipc.Transceiver;
-import org.apache.avro.ipc.specific.SpecificRequestor;
+import com.google.common.collect.Lists;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.apache.flume.source.avro.AvroSourceProtocol;
-import org.apache.flume.source.avro.Status;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +48,8 @@ public class AvroCLIClient {
private static final Logger logger = LoggerFactory
.getLogger(AvroCLIClient.class);
+ private static final int BATCH_SIZE = 5;
+
private String hostname;
private int port;
private String fileName;
@@ -68,6 +68,10 @@ public class AvroCLIClient {
} catch (IOException e) {
logger.error("Unable to send data to Flume - {}", e.getMessage());
logger.debug("Exception follows.", e);
+ } catch (FlumeException e) {
+ logger.error("Unable to open connection to Flume. Exception follows.", e);
+ } catch (EventDeliveryException e) {
+ logger.error("Unable to deliver events to Flume. Exception follows.", e);
}
logger.debug("Exiting");
@@ -108,16 +112,14 @@ public class AvroCLIClient {
return true;
}
- private void run() throws IOException {
+ private void run() throws IOException, FlumeException,
+ EventDeliveryException {
- Transceiver transceiver = null;
BufferedReader reader = null;
+ RpcClient rpcClient = RpcClientFactory.getInstance(hostname, port, BATCH_SIZE);
try {
- transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port));
- AvroSourceProtocol client = SpecificRequestor.getClient(
- AvroSourceProtocol.class, transceiver);
- List<AvroFlumeEvent> eventBuffer = new ArrayList<AvroFlumeEvent>();
+ List<Event> eventBuffer = Lists.newArrayList();
if (fileName != null) {
reader = new BufferedReader(new FileReader(new File(fileName)));
@@ -125,32 +127,24 @@ public class AvroCLIClient {
reader = new BufferedReader(new InputStreamReader(System.in));
}
- String line = null;
+ String line;
long lastCheck = System.currentTimeMillis();
long sentBytes = 0;
+ int batchSize = rpcClient.getBatchSize();
while ((line = reader.readLine()) != null) {
// logger.debug("read:{}", line);
- if (eventBuffer.size() >= 1000) {
- Status status = client.appendBatch(eventBuffer);
-
- if (!status.equals(Status.OK)) {
- logger.error("Unable to send batch size:{} status:{}",
- eventBuffer.size(), status);
- }
-
+ int size = eventBuffer.size();
+ if (size == batchSize) {
+ rpcClient.appendBatch(eventBuffer);
eventBuffer.clear();
}
- AvroFlumeEvent avroEvent = new AvroFlumeEvent();
-
- avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>());
- avroEvent.setBody(ByteBuffer.wrap(line.getBytes()));
-
- eventBuffer.add(avroEvent);
+ Event event = EventBuilder.withBody(line, Charset.forName("UTF8"));
+ eventBuffer.add(event);
- sentBytes += avroEvent.getBody().capacity();
+ sentBytes += event.getBody().length;
sent++;
long now = System.currentTimeMillis();
@@ -161,15 +155,8 @@ public class AvroCLIClient {
}
}
- if (eventBuffer.size() > 0) {
- Status status = client.appendBatch(eventBuffer);
-
- if (!status.equals(Status.OK)) {
- logger.error("Unable to send batch size:{} status:{}",
- eventBuffer.size(), status);
- }
-
- eventBuffer.clear();
+ if (!eventBuffer.isEmpty()) {
+ rpcClient.appendBatch(eventBuffer);
}
logger.debug("Finished");
@@ -179,10 +166,8 @@ public class AvroCLIClient {
reader.close();
}
- if (transceiver != null) {
- logger.debug("Closing tranceiver");
- transceiver.close();
- }
+ logger.debug("Closing RPC client");
+ rpcClient.close();
}
}
}
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java Tue Mar 13 02:25:30 2012
@@ -20,48 +20,41 @@
package org.apache.flume.sink;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyTransceiver;
-import org.apache.avro.ipc.Transceiver;
-import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.conf.Configurable;
-import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.apache.flume.source.AvroSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
/**
* <p>
- * A {@link Sink} implementation that can send events to an Avro server (such as
- * Flume's <tt>AvroSource</tt>).
+ * A {@link Sink} implementation that can send events to an RPC server (such as
+ * Flume's {@link AvroSource}).
* </p>
* <p>
* This sink forms one half of Flume's tiered collection support. Events sent to
- * this sink are turned into {@link AvroFlumeEvent}s and sent to the configured
- * hostname / port pair using Avro's {@link NettyTransceiver}. The intent is
- * that the destination is an instance of Flume's <tt>AvroSource</tt> which
- * allows Flume to nodes to forward to other Flume nodes forming a tiered
+ * this sink are transported over the network to the hostname / port pair using
+ * the RPC implementation encapsulated in {@link RpcClient}.
+ * The destination is an instance of Flume's {@link AvroSource}, which
+ * allows Flume agents to forward to other Flume agents, forming a tiered
* collection infrastructure. Of course, nothing prevents one from using this
* sink to speak to other custom built infrastructure that implements the same
- * Avro protocol (specifically {@link AvroSourceProtocol}).
+ * RPC protocol.
* </p>
* <p>
* Events are taken from the configured {@link Channel} in batches of the
@@ -120,8 +113,7 @@ public class AvroSink extends AbstractSi
private Integer port;
private Integer batchSize;
- private AvroSourceProtocol client;
- private Transceiver transceiver;
+ private RpcClient client;
private CounterGroup counterGroup;
public AvroSink() {
@@ -132,8 +124,8 @@ public class AvroSink extends AbstractSi
public void configure(Context context) {
hostname = context.getString("hostname");
port = context.getInteger("port");
- batchSize = context.getInteger("batch-size");
+ batchSize = context.getInteger("batch-size");
if (batchSize == null) {
batchSize = defaultBatchSize;
}
@@ -142,47 +134,63 @@ public class AvroSink extends AbstractSi
Preconditions.checkState(port != null, "No port specified");
}
- private void createConnection() throws IOException {
- if (transceiver == null) {
- logger.debug("Creating new tranceiver connection to hostname:{} port:{}",
- hostname, port);
- transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port));
- }
+ /**
+ * If this function is called successively without calling
+ * {@see #destroyConnection()}, only the first call has any effect.
+ * @throws FlumeException if an RPC client connection could not be opened
+ */
+ private void createConnection() throws FlumeException {
if (client == null) {
- logger.debug("Creating Avro client with tranceiver:{}", transceiver);
- client = SpecificRequestor.getClient(AvroSourceProtocol.class,
- transceiver);
+ logger.debug(
+ "Building RpcClient with hostname:{}, port:{}, batchSize:{}",
+ new Object[] { hostname, port, batchSize });
+
+ client = RpcClientFactory.getInstance(hostname, port, batchSize);
}
+
}
private void destroyConnection() {
- if (transceiver != null) {
- logger.debug("Destroying tranceiver:{}", transceiver);
+ if (client != null) {
+ logger.debug("Closing avro client:{}", client);
try {
- transceiver.close();
- } catch (IOException e) {
- logger
- .error(
- "Attempt to clean up avro tranceiver after client error failed. Exception follows.",
- e);
+ client.close();
+ } catch (FlumeException e) {
+ logger.error("Attempt to close avro client failed. Exception follows.",
+ e);
}
-
- transceiver = null;
}
client = null;
}
+ /**
+ * Ensure the connection exists and is active.
+ * If the connection is not active, destroy it and recreate it.
+ *
+ * @throws FlumeException If there are errors closing or opening the RPC
+ * connection.
+ */
+ private void verifyConnection() throws FlumeException {
+ if (client == null) {
+ createConnection();
+ } else if (!client.isActive()) {
+ destroyConnection();
+ createConnection();
+ }
+ }
+
@Override
public void start() {
logger.info("Avro sink starting");
try {
createConnection();
- } catch (Exception e) {
+ } catch (FlumeException e) {
logger.error("Unable to create avro client using hostname:" + hostname
- + " port:" + port + ". Exception follows.", e);
+ + ", port:" + port + ", batchSize: " + batchSize +
+ ". Exception follows.", e);
/* Try to prevent leaking resources. */
destroyConnection();
@@ -215,9 +223,10 @@ public class AvroSink extends AbstractSi
try {
transaction.begin();
- createConnection();
- List<AvroFlumeEvent> batch = new LinkedList<AvroFlumeEvent>();
+ verifyConnection();
+
+ List<Event> batch = Lists.newLinkedList();
for (int i = 0; i < batchSize; i++) {
Event event = channel.take();
@@ -227,44 +236,35 @@ public class AvroSink extends AbstractSi
break;
}
- AvroFlumeEvent avroEvent = new AvroFlumeEvent();
-
- avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
- Map<CharSequence, CharSequence> headers = Maps.newHashMap();
-
- for (Entry<String, String> entry : event.getHeaders().entrySet()) {
- headers.put(entry.getKey(), entry.getValue());
- }
- avroEvent.setHeaders(headers);
- batch.add(avroEvent);
+ batch.add(event);
}
if (batch.isEmpty()) {
counterGroup.incrementAndGet("batch.empty");
status = Status.BACKOFF;
} else {
- if (!client.appendBatch(batch).equals(
- org.apache.flume.source.avro.Status.OK)) {
- throw new AvroRemoteException("RPC communication returned FAILED");
- }
+ client.appendBatch(batch);
}
transaction.commit();
counterGroup.incrementAndGet("batch.success");
+
} catch (ChannelException e) {
transaction.rollback();
logger.error("Unable to get event from channel. Exception follows.", e);
status = Status.BACKOFF;
- } catch (AvroRemoteException e) {
+
+ } catch (EventDeliveryException e) {
transaction.rollback();
- logger.error("Unable to send event batch. Exception follows.", e);
- status = Status.BACKOFF;
- } catch (Exception e) {
+ destroyConnection();
+ throw e;
+
+ } catch (FlumeException e) {
transaction.rollback();
- logger.error(
- "Unable to communicate with Avro server. Exception follows.", e);
- status = Status.BACKOFF;
destroyConnection();
+ throw new EventDeliveryException("RPC connection error. " +
+ "Exception follows.", e);
+
} finally {
transaction.close();
}
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java Tue Mar 13 02:25:30 2012
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.Server;
@@ -156,20 +155,13 @@ public class AvroSource extends Abstract
}
@Override
- public Status append(AvroFlumeEvent avroEvent) throws AvroRemoteException {
+ public Status append(AvroFlumeEvent avroEvent) {
logger.debug("Received avro event:{}", avroEvent);
counterGroup.incrementAndGet("rpc.received");
- Map<String, String> headers = new HashMap<String, String>();
-
- for (Entry<CharSequence, CharSequence> entry : avroEvent.getHeaders()
- .entrySet()) {
-
- headers.put(entry.getKey().toString(), entry.getValue().toString());
- }
-
- Event event = EventBuilder.withBody(avroEvent.getBody().array(), headers);
+ Event event = EventBuilder.withBody(avroEvent.getBody().array(),
+ avroEvent.getHeaders());
try {
getChannelProcessor().processEvent(event);
@@ -189,15 +181,8 @@ public class AvroSource extends Abstract
List<Event> batch = new ArrayList<Event>();
for (AvroFlumeEvent avroEvent : events) {
- Map<String, String> headers = new HashMap<String, String>();
-
- for (Entry<CharSequence, CharSequence> entry : avroEvent.getHeaders()
- .entrySet()) {
-
- headers.put(entry.getKey().toString(), entry.getValue().toString());
- }
-
- Event event = EventBuilder.withBody(avroEvent.getBody().array(), headers);
+ Event event = EventBuilder.withBody(avroEvent.getBody().array(),
+ avroEvent.getHeaders());
counterGroup.incrementAndGet("rpc.events");
batch.add(event);
Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java Tue Mar 13 02:25:30 2012
@@ -20,6 +20,7 @@
package org.apache.flume.sink;
import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
@@ -92,7 +93,9 @@ public class TestAvroSink {
}
@Test
- public void testProcess() throws InterruptedException, EventDeliveryException {
+ public void testProcess() throws InterruptedException,
+ EventDeliveryException {
+
Event event = EventBuilder.withBody("test event 1".getBytes(),
new HashMap<String, String>());
Server server = createServer();
@@ -130,8 +133,8 @@ public class TestAvroSink {
public void testFailedConnect() throws InterruptedException,
EventDeliveryException {
- Event event = EventBuilder.withBody("test event 1".getBytes(),
- new HashMap<String, String>());
+ Event event = EventBuilder.withBody("test event 1",
+ Charset.forName("UTF8"));
Server server = createServer();
server.start();
@@ -153,8 +156,14 @@ public class TestAvroSink {
transaction.close();
for (int i = 0; i < 5; i++) {
- Sink.Status status = sink.process();
- Assert.assertEquals(Sink.Status.BACKOFF, status);
+ boolean threwException = false;
+ try {
+ sink.process();
+ } catch (EventDeliveryException e) {
+ threwException = true;
+ }
+ Assert.assertTrue("Must throw EventDeliveryException if disconnected",
+ threwException);
}
server = createServer();
Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java Tue Mar 13 02:25:30 2012
@@ -145,7 +145,7 @@ public class TestAvroSource {
AvroFlumeEvent avroEvent = new AvroFlumeEvent();
- avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>());
+ avroEvent.setHeaders(new HashMap<String, String>());
avroEvent.setBody(ByteBuffer.wrap("Hello avro".getBytes()));
Status status = client.append(avroEvent);
Modified: incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/pom.xml Tue Mar 13 02:25:30 2012
@@ -59,6 +59,11 @@ limitations under the License.
<dependency>
<groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
</dependency>
Modified: incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java (original)
+++ incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java Tue Mar 13 02:25:30 2012
@@ -38,14 +38,15 @@ import org.apache.flume.source.AbstractS
import com.cloudera.flume.handlers.avro.AvroFlumeOGEvent;
import com.cloudera.flume.handlers.avro.FlumeOGEventAvroServer;
+import org.apache.avro.AvroRemoteException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flume.ChannelException;
/**
* <p>
- * A {@link Source} implementation that receives Avro events from Avro sink of Flume OG
- * </p>
+ * A {@link Source} implementation that receives Avro events from Avro sink of
+ * Flume OG</p>
* <p>
* <b>Configuration options</b>
* </p>
@@ -77,8 +78,8 @@ import org.apache.flume.ChannelException
* </p>
*/
-public class AvroLegacySource extends AbstractSource implements EventDrivenSource,
- Configurable, FlumeOGEventAvroServer {
+public class AvroLegacySource extends AbstractSource implements
+ EventDrivenSource, Configurable, FlumeOGEventAvroServer {
static final Logger LOG = LoggerFactory.getLogger(AvroLegacySource.class);
@@ -121,7 +122,7 @@ public class AvroLegacySource extends Ab
}
@Override
- public Void append( AvroFlumeOGEvent evt ) throws org.apache.avro.AvroRemoteException {
+ public Void append( AvroFlumeOGEvent evt ) throws AvroRemoteException {
counterGroup.incrementAndGet("rpc.received");
Map<String, String> headers = new HashMap<String, String>();
@@ -130,7 +131,7 @@ public class AvroLegacySource extends Ab
headers.put(TIMESTAMP, evt.getTimestamp().toString());
headers.put(PRIORITY, evt.getPriority().toString());
headers.put(NANOS, evt.getNanos().toString());
- for (Entry<CharSequence, ByteBuffer> entry: evt.getFields().entrySet()) {
+ for (Entry<String, ByteBuffer> entry: evt.getFields().entrySet()) {
headers.put(entry.getKey().toString(), entry.getValue().toString());
}
headers.put(OG_EVENT, "yes");
Modified: incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java (original)
+++ incubator/flume/trunk/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java Tue Mar 13 02:25:30 2012
@@ -145,7 +145,7 @@ public class TestLegacyAvroSource {
AvroFlumeOGEvent avroEvent = AvroFlumeOGEvent.newBuilder().setHost("foo").
setPriority(Priority.INFO).setNanos(0).setTimestamp(1).
- setFields(new HashMap<CharSequence, ByteBuffer> ()).
+ setFields(new HashMap<String, ByteBuffer> ()).
setBody(ByteBuffer.wrap("foo".getBytes())).build();
client.append(avroEvent);
Modified: incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-legacy-sources/flume-thrift-source/pom.xml Tue Mar 13 02:25:30 2012
@@ -103,6 +103,11 @@ limitations under the License.
<dependency>
<groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
</dependency>
Modified: incubator/flume/trunk/flume-ng-node/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-node/pom.xml?rev=1299958&r1=1299957&r2=1299958&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-node/pom.xml (original)
+++ incubator/flume/trunk/flume-ng-node/pom.xml Tue Mar 13 02:25:30 2012
@@ -59,6 +59,11 @@
<dependency>
<groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
</dependency>
Propchange: incubator/flume/trunk/flume-ng-sdk/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Mar 13 02:25:30 2012
@@ -0,0 +1,4 @@
+.classpath
+.project
+.settings
+target
Added: incubator/flume/trunk/flume-ng-sdk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/pom.xml?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/pom.xml (added)
+++ incubator/flume/trunk/flume-ng-sdk/pom.xml Tue Mar 13 02:25:30 2012
@@ -0,0 +1,90 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flume-parent</artifactId>
+ <groupId>org.apache.flume</groupId>
+ <version>1.1.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flume-ng-sdk</artifactId>
+ <name>Flume NG SDK</name>
+ <description>Flume Software Development Kit: Stable public API for integration with Flume 1.x</description>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <configuration>
+ <stringType>String</stringType>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>idl-protocol</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>com.thoughtworks.paranamer</groupId>
+ <artifactId>paranamer-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>run</id>
+ <configuration>
+ <sourceDirectory>${project.build.directory}/generated-sources/avro</sourceDirectory>
+ <outputDirectory>${project.build.directory}/classes</outputDirectory>
+ </configuration>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+
+ <dependencies>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc</artifactId>
+ </dependency>
+
+ </dependencies>
+</project>
Added: incubator/flume/trunk/flume-ng-sdk/src/main/avro/flume.avdl
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/avro/flume.avdl?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/avro/flume.avdl (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/avro/flume.avdl Tue Mar 13 02:25:30 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+@namespace("org.apache.flume.source.avro")
+
+protocol AvroSourceProtocol {
+
+ enum Status {
+ OK, FAILED, UNKNOWN
+ }
+
+ record AvroFlumeEvent {
+ map<string> headers;
+ bytes body;
+ }
+
+ Status append( AvroFlumeEvent event );
+
+ Status appendBatch( array<AvroFlumeEvent> events );
+
+}
Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/Event.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/Event.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/Event.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/Event.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume;
+
+import java.util.Map;
+
+/**
+ * Basic representation of a data object in Flume.
+ * Provides access to data as it flows through the system.
+ */
+public interface Event {
+
+ /**
+ * Returns a map of name-value pairs describing the data stored in the body.
+ */
+ public Map<String, String> getHeaders();
+
+ /**
+ * Set the event headers
+ * @param headers Map of headers to replace the current headers.
+ */
+ public void setHeaders(Map<String, String> headers);
+
+ /**
+ * Returns the raw byte array of the data contained in this event.
+ */
+ public byte[] getBody();
+
+ /**
+ * Sets the raw byte array of the data contained in this event.
+ * @param body The data.
+ */
+ public void setBody(byte[] body);
+
+}
Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/Event.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/EventDeliveryException.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/EventDeliveryException.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/EventDeliveryException.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/EventDeliveryException.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume;
+
+/**
+ * An event delivery exception is raised whenever an {@link Event} fails to
+ * reach at least one of its intended (next-hop) destinations.
+ */
+public class EventDeliveryException extends Exception {
+
+ private static final long serialVersionUID = 1102327497549834945L;
+
+ public EventDeliveryException() {
+ super();
+ }
+
+ public EventDeliveryException(String message) {
+ super(message);
+ }
+
+ public EventDeliveryException(String message, Throwable t) {
+ super(message, t);
+ }
+
+ public EventDeliveryException(Throwable t) {
+ super(t);
+ }
+
+}
Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/EventDeliveryException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/FlumeException.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/FlumeException.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/FlumeException.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/FlumeException.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume;
+
+/**
+ * Base class of all flume exceptions.
+ */
+public class FlumeException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ public FlumeException(String msg) {
+ super(msg);
+ }
+
+ public FlumeException(String msg, Throwable th) {
+ super(msg, th);
+ }
+
+ public FlumeException(Throwable th) {
+ super(th);
+ }
+
+}
Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/FlumeException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,382 @@
+/*
+ * Copyright 2012 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.api;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.avro.ipc.CallFuture;
+
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.apache.flume.source.avro.Status;
+
+/**
+ * Avro/Netty implementation of {@link RpcClient}.
+ * The connections are intended to be opened before clients are given access so
+ * that the object cannot ever be in an inconsistent when exposed to users.
+ */
+public class NettyAvroRpcClient implements RpcClient {
+
+ private final ReentrantLock stateLock = new ReentrantLock();
+
+ private final static long DEFAULT_CONNECT_TIMEOUT_MILLIS =
+ TimeUnit.MILLISECONDS.convert(60, TimeUnit.SECONDS);
+
+ private final static long DEFAULT_REQUEST_TIMEOUT_MILLIS =
+ TimeUnit.MILLISECONDS.convert(60, TimeUnit.SECONDS);
+
+ /**
+ * Guarded by {@code stateLock}
+ */
+ private ConnState connState;
+
+ private final String hostname;
+ private final Integer port;
+ private final Integer batchSize;
+
+ private Transceiver transceiver;
+ private AvroSourceProtocol.Callback avroClient;
+
+ /**
+ * This constructor is intended to be called from {@link AvroClientBuilder}.
+ * @param hostname The destination hostname
+ * @param port The destination port
+ * @param batchSize Maximum number of Events to accept in appendBatch()
+ */
+ private NettyAvroRpcClient(String hostname, Integer port, Integer batchSize) {
+
+ if (hostname == null) throw new NullPointerException("hostname is null");
+ if (port == null) throw new NullPointerException("port is null");
+ if (batchSize == null) throw new NullPointerException("batchSize is null");
+
+ this.hostname = hostname;
+ this.port = port;
+ this.batchSize = batchSize;
+
+ setState(ConnState.INIT);
+ }
+
+ /**
+ * This method should only be invoked by the Builder
+ * @throws FlumeException
+ */
+ private void connect() throws FlumeException {
+ connect(DEFAULT_CONNECT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Internal only, for now
+ * @param timeout
+ * @param tu
+ * @throws FlumeException
+ */
+ private void connect(long timeout, TimeUnit tu) throws FlumeException {
+ try {
+ transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port),
+ tu.toMillis(timeout));
+ avroClient =
+ SpecificRequestor.getClient(AvroSourceProtocol.Callback.class,
+ transceiver);
+
+ } catch (IOException ex) {
+ throw new FlumeException("RPC connection error. Exception follows.", ex);
+ }
+
+ setState(ConnState.READY);
+ }
+
+ @Override
+ public void close() throws FlumeException {
+ try {
+ transceiver.close();
+ } catch (IOException ex) {
+ throw new FlumeException("Error closing transceiver. Exception follows.",
+ ex);
+ } finally {
+ setState(ConnState.DEAD);
+ }
+
+ }
+
+ @Override
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ @Override
+ public void append(Event event) throws EventDeliveryException {
+ try {
+ append(event, DEFAULT_REQUEST_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (EventDeliveryException e) {
+ // we mark as no longer active without trying to clean up resources
+ // client is required to call close() to clean up resources
+ setState(ConnState.DEAD);
+ throw e;
+ }
+ }
+
+ private void append(Event event, long timeout, TimeUnit tu)
+ throws EventDeliveryException {
+
+ assertReady();
+
+ CallFuture<Status> callFuture = new CallFuture<Status>();
+
+ try {
+ AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+ avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
+ avroEvent.setHeaders(event.getHeaders());
+ avroClient.append(avroEvent, callFuture);
+ } catch (IOException ex) {
+ throw new EventDeliveryException("RPC request IO exception. " +
+ "Exception follows.", ex);
+ }
+
+ waitForStatusOK(callFuture, timeout, tu);
+ }
+
+ @Override
+ public void appendBatch(List<Event> events) throws EventDeliveryException {
+ try {
+ appendBatch(events, DEFAULT_REQUEST_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS);
+ } catch (EventDeliveryException e) {
+ // we mark as no longer active without trying to clean up resources
+ // client is required to call close() to clean up resources
+ setState(ConnState.DEAD);
+ throw e;
+ }
+ }
+
+ private void appendBatch(List<Event> events, long timeout, TimeUnit tu)
+ throws EventDeliveryException {
+
+ assertReady();
+
+ Iterator<Event> iter = events.iterator();
+ List<AvroFlumeEvent> avroEvents = new LinkedList<AvroFlumeEvent>();
+
+ // send multiple batches... bail if there is a problem at any time
+ while (iter.hasNext()) {
+ avroEvents.clear();
+
+ for (int i = 0; i < batchSize && iter.hasNext(); i++) {
+ Event event = iter.next();
+ AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+ avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
+ avroEvent.setHeaders(event.getHeaders());
+ avroEvents.add(avroEvent);
+ }
+
+ CallFuture<Status> callFuture = new CallFuture<Status>();
+ try {
+ avroClient.appendBatch(avroEvents, callFuture);
+ } catch (IOException ex) {
+ throw new EventDeliveryException("RPC request IO exception. " +
+ "Exception follows.", ex);
+ }
+
+ waitForStatusOK(callFuture, timeout, tu);
+ }
+ }
+
+ /**
+ * Helper method that waits for a Status future to come back and validates
+ * that it returns Status == OK.
+ * @param callFuture Future to wait on
+ * @param timeout Time to wait before failing
+ * @param tu Time Unit of {@code timeout}
+ * @throws EventDeliveryException If there is a timeout or if Status != OK
+ */
+ private static void waitForStatusOK(CallFuture<Status> callFuture,
+ long timeout, TimeUnit tu) throws EventDeliveryException {
+ try {
+ Status status = callFuture.get(timeout, tu);
+ if (status != Status.OK) {
+ throw new EventDeliveryException("Status (" + status + ") is not OK");
+ }
+ } catch (CancellationException ex) {
+ throw new EventDeliveryException("RPC future was cancelled." +
+ " Exception follows.", ex);
+ } catch (ExecutionException ex) {
+ throw new EventDeliveryException("Exception thrown from remote handler." +
+ " Exception follows.", ex);
+ } catch (TimeoutException ex) {
+ throw new EventDeliveryException("RPC request timed out." +
+ " Exception follows.", ex);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new EventDeliveryException("RPC request interrupted." +
+ " Exception follows.", ex);
+ }
+ }
+
+ /**
+ * This method should always be used to change {@code connState} so we ensure
+ * that invalid state transitions do not occur and that the {@code isIdle}
+ * {@link Condition} variable gets signaled reliably.
+ * Throws {@code IllegalStateException} when called to transition from CLOSED
+ * to another state.
+ * @param state
+ */
+ private void setState(ConnState newState) {
+ stateLock.lock();
+ try {
+ if (connState == ConnState.DEAD && connState != newState) {
+ throw new IllegalStateException("Cannot transition from CLOSED state.");
+ }
+ connState = newState;
+ } finally {
+ stateLock.unlock();
+ }
+ }
+
+ /**
+ * If the connection state != READY, throws {@link EventDeliveryException}.
+ */
+ private void assertReady() throws EventDeliveryException {
+ stateLock.lock();
+ try {
+ ConnState curState = connState;
+ if (curState != ConnState.READY) {
+ throw new EventDeliveryException("RPC failed, client in an invalid " +
+ "state: " + curState);
+ }
+ } finally {
+ stateLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isActive() {
+ stateLock.lock();
+ try {
+ return (connState == ConnState.READY);
+ } finally {
+ stateLock.unlock();
+ }
+ }
+
+ private static enum ConnState {
+ INIT, READY, DEAD
+ }
+
+ /**
+ * <p>Builder class used to construct {@link NettyAvroRpcClient} objects.</p>
+ *
+ * <p><strong>Note:</strong> It is recommended for applications to construct
+ * {@link RpcClient} instances using the {@link RpcClientFactory} class.</p>
+ */
+ protected static class Builder {
+
+ protected static final int DEFAULT_BATCH_SIZE = 100;
+
+ private String hostname;
+ private Integer port;
+ private Integer batchSize;
+
+ public Builder() {
+ batchSize = DEFAULT_BATCH_SIZE;
+ }
+
+ /**
+ * Hostname to connect to (required)
+ *
+ * @param hostname
+ * @return {@code this}
+ */
+ public Builder hostname(String hostname) {
+ if (hostname == null) {
+ throw new NullPointerException("hostname is null");
+ }
+
+ this.hostname = hostname;
+ return this;
+ }
+
+ /**
+ * Port to connect to (required)
+ *
+ * @param port
+ * @return {@code this}
+ */
+ public Builder port(Integer port) {
+ if (port == null) {
+ throw new NullPointerException("port is null");
+ }
+
+ this.port = port;
+ return this;
+ }
+
+ /**
+ * Maximum number of {@linkplain Event events} that can be processed in a
+ * batch operation. (optional)<br>
+ * Default: 100
+ *
+ * @param batchSize
+ * @return {@code this}
+ */
+ public Builder batchSize(Integer batchSize) {
+ if (batchSize == null) {
+ throw new NullPointerException("batch size is null");
+ }
+
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ /**
+ * Construct the object
+ * @return Active RPC client
+ * @throws FlumeException
+ */
+ public NettyAvroRpcClient build() throws FlumeException {
+ // validate the required fields
+ if (hostname == null) throw new NullPointerException("hostname is null");
+ if (port == null) throw new NullPointerException("port is null");
+ if (batchSize == null) {
+ throw new NullPointerException("batch size is null");
+ }
+
+ NettyAvroRpcClient client =
+ new NettyAvroRpcClient(hostname, port, batchSize);
+ client.connect();
+
+ return client;
+ }
+ }
+
+}
Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClient.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClient.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClient.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2012 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.api;
+
+import java.util.List;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+
+/**
+ * <p>Public client interface for sending data to Flume.</p>
+ *
+ * <p>This interface is intended not to change incompatibly for Flume 1.x.</p>
+ *
+ * <p><strong>Note:</strong> It is recommended for applications to construct
+ * {@link RpcClient} instances using the {@link RpcClientFactory} class,
+ * instead of using builders associated with a particular implementation class.
+ * </p>
+ *
+ * @see org.apache.flume.api.RpcClientFactory
+ */
+public interface RpcClient {
+
+ /**
+ * Returns the maximum number of {@link Event events} that may be batched
+ * at once by {@link #appendBatch(List) appendBatch()}.
+ */
+ public int getBatchSize();
+
+ /**
+ * <p>Send a single {@link Event} to the associated Flume source.</p>
+ *
+ * <p>This method blocks until the RPC returns or until the request times out.
+ * </p>
+ *
+ * <p><strong>Note:</strong> If this method throws an
+ * {@link EventDeliveryException}, there is no way to recover and the
+ * application must invoke {@link #close()} on this object to clean up system
+ * resources.</p>
+ *
+ * @param event
+ * @return
+ * @throws EventDeliveryException when an error prevents event delivery.
+ */
+ public void append(Event event) throws EventDeliveryException;
+
+ /**
+ * <p>Send a list of {@linkplain Event events} to the associated Flume source.
+ * </p>
+ *
+ * <p>This method blocks until the RPC returns or until the request times out.
+ * </p>
+ *
+ * <p>It is strongly recommended that the number of events in the List be no
+ * more than {@link #getBatchSize()}. If it is more, multiple RPC calls will
+ * be required, and the likelihood of duplicate Events being stored will
+ * increase.</p>
+ *
+ * <p><strong>Note:</strong> If this method throws an
+ * {@link EventDeliveryException}, there is no way to recover and the
+ * application must invoke {@link #close()} on this object to clean up system
+ * resources.</p>
+ *
+ * @param events List of events to send
+ * @return
+ * @throws EventDeliveryException when an error prevents event delivery.
+ */
+ public void appendBatch(List<Event> events) throws
+ EventDeliveryException;
+
+ /**
+ * <p>Returns {@code true} if this object appears to be in a usable state, and
+ * it returns {@code false} if this object is permanently disabled.</p>
+ *
+ * <p>If this method returns {@code false}, an application must call
+ * {@link #close()} on this object to clean up system resources.</p>
+ */
+ public boolean isActive();
+
+ /**
+ * <p>Immediately closes the client so that it may no longer be used.</p>
+ *
+ * <p><strong>Note:</strong> This method MUST be called by applications
+ * when they are done using the RPC client in order to clean up resources.</p>
+ *
+ * <p>Multi-threaded applications may want to gracefully stop making
+ * RPC calls before calling close(). Otherwise, they risk getting
+ * {@link EventDeliveryException} thrown from their in-flight calls when the
+ * underlying connection is disabled.</p>
+ */
+ public void close() throws FlumeException;
+
+}
Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClient.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2012 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.api;
+
+import org.apache.flume.FlumeException;
+
+/**
+ * Factory class to construct Flume {@link RPCClient} implementations.
+ */
+public class RpcClientFactory {
+
+ /**
+ * Returns an instance of {@link RpcClient} connected to the specified
+ * {@code hostname} and {@code port}.
+ * @throws FlumeException
+ */
+ public static RpcClient getInstance(String hostname, Integer port)
+ throws FlumeException {
+
+ return new NettyAvroRpcClient.Builder()
+ .hostname(hostname).port(port).build();
+ }
+
+ /**
+ * Returns an instance of {@link RpcClient} connected to the specified
+ * {@code hostname} and {@code port} with the specified {@code batchSize}.
+ * @throws FlumeException
+ */
+ public static RpcClient getInstance(String hostname, Integer port,
+ Integer batchSize) throws FlumeException {
+
+ return new NettyAvroRpcClient.Builder()
+ .hostname(hostname).port(port).batchSize(batchSize).build();
+ }
+
+}
Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.event;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.Event;
+
+public class EventBuilder {
+
+ /**
+ * Instantiate an Event instance based on the provided body and headers.
+ * If <code>headers</code> is <code>null</code>, then it is ignored.
+ * @param body
+ * @param headers
+ * @return
+ */
+ public static Event withBody(byte[] body, Map<String, String> headers) {
+ Event event = new SimpleEvent();
+
+ event.setBody(body);
+
+ if (headers != null) {
+ event.setHeaders(new HashMap<String, String>(headers));
+ }
+
+ return event;
+ }
+
+ public static Event withBody(byte[] body) {
+ return withBody(body, null);
+ }
+
+ public static Event withBody(String body, Charset charset,
+ Map<String, String> headers) {
+
+ return withBody(body.getBytes(charset), headers);
+ }
+
+ public static Event withBody(String body, Charset charset) {
+ return withBody(body, charset, null);
+ }
+
+}
Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/EventBuilder.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.event;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.Event;
+
+public class SimpleEvent implements Event {
+
+ private Map<String, String> headers;
+ private byte[] body;
+
+ public SimpleEvent() {
+ headers = new HashMap<String, String>();
+ body = null;
+ }
+
+ @Override
+ public Map<String, String> getHeaders() {
+ return headers;
+ }
+
+ @Override
+ public void setHeaders(Map<String, String> headers) {
+ this.headers = headers;
+ }
+
+ @Override
+ public byte[] getBody() {
+ return body;
+ }
+
+ @Override
+ public void setBody(byte[] body) {
+ this.body = body;
+ }
+
+ @Override
+ public String toString() {
+ return "{ headers:" + headers + " body:" + body + " }";
+ }
+
+}
Propchange: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/event/SimpleEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java?rev=1299958&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java (added)
+++ incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java Tue Mar 13 02:25:30 2012
@@ -0,0 +1,234 @@
+/*
+ * Copyright 2012 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.api;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import junit.framework.Assert;
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.apache.flume.source.avro.Status;
+
+/**
+ * Helpers for Netty Avro RPC testing
+ */
+public class RpcTestUtils {
+
+ private static final Logger logger =
+ Logger.getLogger(TestNettyAvroRpcClient.class.getName());
+
+ private static final String localhost = "localhost";
+
+
+ /**
+ * Helper method for testing simple (single) appends on handlers
+ * @param handler
+ * @throws FlumeException
+ * @throws EventDeliveryException
+ */
+ public static void handlerSimpleAppendTest(AvroSourceProtocol handler)
+ throws FlumeException, EventDeliveryException {
+ NettyAvroRpcClient client = null;
+ Server server = startServer(handler);
+ try {
+ client = getStockLocalClient(server.getPort());
+ boolean isActive = client.isActive();
+ Assert.assertTrue("Client should be active", isActive);
+ client.append(EventBuilder.withBody("wheee!!!", Charset.forName("UTF8")));
+ } finally {
+ stopServer(server);
+ if (client != null) client.close();
+ }
+ }
+
+ /**
+ * Helper method for testing batch appends on handlers
+ * @param handler
+ * @throws FlumeException
+ * @throws EventDeliveryException
+ */
+ public static void handlerBatchAppendTest(AvroSourceProtocol handler)
+ throws FlumeException, EventDeliveryException {
+ NettyAvroRpcClient client = null;
+ Server server = startServer(handler);
+ try {
+ client = getStockLocalClient(server.getPort());
+ boolean isActive = client.isActive();
+ Assert.assertTrue("Client should be active", isActive);
+
+ int batchSize = client.getBatchSize();
+ List<Event> events = new ArrayList<Event>();
+ for (int i = 0; i < batchSize; i++) {
+ events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
+ }
+ client.appendBatch(events);
+
+ } finally {
+ stopServer(server);
+ if (client != null) client.close();
+ }
+ }
+
+ /**
+ * Helper method for constructing a Netty RPC client that talks to localhost.
+ */
+ public static NettyAvroRpcClient getStockLocalClient(int port) {
+ NettyAvroRpcClient client = new NettyAvroRpcClient.Builder()
+ .hostname(localhost).port(port).build();
+
+ return client;
+ }
+
+ /**
+ * Start a NettyServer, wait a moment for it to spin up, and return it.
+ */
+ public static Server startServer(AvroSourceProtocol handler) {
+ Responder responder = new SpecificResponder(AvroSourceProtocol.class,
+ handler);
+ Server server = new NettyServer(responder,
+ new InetSocketAddress(localhost, 0));
+ server.start();
+ logger.log(Level.INFO, "Server started on hostname: {0}, port: {1}",
+ new Object[] { localhost, Integer.toString(server.getPort()) });
+
+ try {
+
+ Thread.sleep(300L);
+
+ } catch (InterruptedException ex) {
+ logger.log(Level.SEVERE, "Thread interrupted. Exception follows.", ex);
+ Thread.currentThread().interrupt();
+ }
+
+ return server;
+ }
+
+ /**
+ * Request that the specified Server stop, and attempt to wait for it to exit.
+ * @param server A running NettyServer
+ */
+ public static void stopServer(Server server) {
+ try {
+ server.close();
+ server.join();
+ } catch (InterruptedException ex) {
+ logger.log(Level.SEVERE, "Thread interrupted. Exception follows.", ex);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * A service that logs receipt of the request and returns OK
+ */
+ public static class OKAvroHandler implements AvroSourceProtocol {
+
+ @Override
+ public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+ logger.log(Level.INFO, "OK: Received event from append(): {0}",
+ new String(event.getBody().array(), Charset.forName("UTF8")));
+ return Status.OK;
+ }
+
+ @Override
+ public Status appendBatch(List<AvroFlumeEvent> events) throws
+ AvroRemoteException {
+ logger.log(Level.INFO, "OK: Received {0} events from appendBatch()",
+ events.size());
+ return Status.OK;
+ }
+
+ }
+
+ /**
+ * A service that logs receipt of the request and returns Failed
+ */
+ public static class FailedAvroHandler implements AvroSourceProtocol {
+
+ @Override
+ public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+ logger.log(Level.INFO, "Failed: Received event from append(): {0}",
+ new String(event.getBody().array(), Charset.forName("UTF8")));
+ return Status.FAILED;
+ }
+
+ @Override
+ public Status appendBatch(List<AvroFlumeEvent> events) throws
+ AvroRemoteException {
+ logger.log(Level.INFO, "Failed: Received {0} events from appendBatch()",
+ events.size());
+ return Status.FAILED;
+ }
+
+ }
+
+ /**
+ * A service that logs receipt of the request and returns Unknown
+ */
+ public static class UnknownAvroHandler implements AvroSourceProtocol {
+
+ @Override
+ public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+ logger.log(Level.INFO, "Unknown: Received event from append(): {0}",
+ new String(event.getBody().array(), Charset.forName("UTF8")));
+ return Status.UNKNOWN;
+ }
+
+ @Override
+ public Status appendBatch(List<AvroFlumeEvent> events) throws
+ AvroRemoteException {
+ logger.log(Level.INFO, "Unknown: Received {0} events from appendBatch()",
+ events.size());
+ return Status.UNKNOWN;
+ }
+
+ }
+
+ /**
+ * A service that logs receipt of the request and then throws an exception
+ */
+ public static class ThrowingAvroHandler implements AvroSourceProtocol {
+
+ @Override
+ public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+ logger.log(Level.INFO, "Throwing: Received event from append(): {0}",
+ new String(event.getBody().array(), Charset.forName("UTF8")));
+ throw new AvroRemoteException("Handler smash!");
+ }
+
+ @Override
+ public Status appendBatch(List<AvroFlumeEvent> events) throws
+ AvroRemoteException {
+ logger.log(Level.INFO, "Throwing: Received {0} events from appendBatch()",
+ events.size());
+ throw new AvroRemoteException("Handler smash!");
+ }
+
+ }
+
+}
Propchange: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
------------------------------------------------------------------------------
svn:eol-style = native