You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:49:01 UTC
svn commit: r1156921 - in
/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source:
NetcatSource.java TestNetcatSource.java
Author: esammer
Date: Fri Aug 12 00:49:00 2011
New Revision: 1156921
URL: http://svn.apache.org/viewvc?rev=1156921&view=rev
Log:
- Added a netcat-style source and test for testing / debugging / playing around.
Added:
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/NetcatSource.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
Added: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/NetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/NetcatSource.java?rev=1156921&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/NetcatSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/NetcatSource.java Fri Aug 12 00:49:00 2011
@@ -0,0 +1,115 @@
+package org.apache.flume.source;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.CharBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.flume.Context;
+import org.apache.flume.CounterGroup;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetcatSource extends AbstractEventSource {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(NetcatSource.class);
+
+ private int port;
+ private ServerSocketChannel serverSocket;
+ private CounterGroup counterGroup;
+
+ public NetcatSource() {
+ port = 0;
+ counterGroup = new CounterGroup();
+ }
+
+ @Override
+ public void open(Context context) {
+ counterGroup.incrementAndGet("open.attempts");
+
+ try {
+ SocketAddress bindPoint = new InetSocketAddress(port);
+
+ serverSocket = ServerSocketChannel.open();
+ serverSocket.socket().setReuseAddress(true);
+ serverSocket.socket().bind(bindPoint);
+
+ logger.info("Created serverSocket:{}", serverSocket);
+ } catch (IOException e) {
+ counterGroup.incrementAndGet("open.errors");
+ logger.error("Unable to bind to socket. Exception follows.", e);
+ }
+ }
+
+ @Override
+ public Event<?> next(Context context) throws InterruptedException,
+ EventDeliveryException {
+
+ Event<?> event = null;
+
+ counterGroup.incrementAndGet("next.calls");
+
+ try {
+ SocketChannel channel = serverSocket.accept();
+
+ logger.debug("Received a connection:{}", channel);
+
+ Reader reader = Channels.newReader(channel, "utf-8");
+ CharBuffer buffer = CharBuffer.allocate(512);
+ StringBuilder builder = new StringBuilder();
+
+ while (reader.read(buffer) != -1) {
+ buffer.flip();
+ logger.debug("read {} characters", buffer.remaining());
+ builder.append(buffer.array(), buffer.position(), buffer.length());
+ }
+
+ if (builder.charAt(builder.length() - 1) == '\n') {
+ builder.deleteCharAt(builder.length() - 1);
+ }
+
+ logger.debug("end of message");
+
+ event = EventBuilder.withBody(builder.toString());
+
+ channel.close();
+
+ counterGroup.incrementAndGet("events.success");
+ } catch (IOException e) {
+ counterGroup.incrementAndGet("events.failed");
+
+ throw new EventDeliveryException("Unable to process event due to "
+ + e.getMessage(), e);
+ }
+
+ return event;
+ }
+
+ @Override
+ public void close(Context context) {
+ if (serverSocket != null) {
+ try {
+ serverSocket.close();
+ } catch (IOException e) {
+ logger.error("Unable to close socket. Exception follows.", e);
+ }
+ }
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1156921&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Fri Aug 12 00:49:00 2011
@@ -0,0 +1,82 @@
+package org.apache.flume.source;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.net.InetSocketAddress;
+import java.nio.channels.Channels;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.EventSource;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNetcatSource {
+
+ private EventSource source;
+
+ @Before
+ public void setUp() {
+ source = new NetcatSource();
+ }
+
+ @Test(timeout = 5000)
+ public void testLifecycle() throws InterruptedException, LifecycleException,
+ EventDeliveryException {
+
+ ExecutorService executor = Executors.newFixedThreadPool(3);
+ Context context = new Context();
+
+ /* FIXME: Use a random port for testing. */
+ ((NetcatSource) source).setPort(41414);
+
+ source.open(context);
+
+ Runnable clientRequestRunnable = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ SocketChannel clientChannel = SocketChannel
+ .open(new InetSocketAddress(41414));
+
+ Writer writer = Channels.newWriter(clientChannel, "utf-8");
+
+ writer.write("Test message");
+
+ writer.flush();
+ clientChannel.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ };
+
+ for (int i = 0; i < 100; i++) {
+ executor.submit(clientRequestRunnable);
+
+ Event<?> event = source.next(context);
+
+ Assert.assertNotNull(event);
+ Assert.assertEquals("Test message", event.getBody());
+ }
+
+ executor.shutdown();
+
+ while (!executor.isTerminated()) {
+ executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+ }
+
+ source.close(context);
+ }
+
+}