You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:49:01 UTC

svn commit: r1156921 - in /incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source: NetcatSource.java TestNetcatSource.java

Author: esammer
Date: Fri Aug 12 00:49:00 2011
New Revision: 1156921

URL: http://svn.apache.org/viewvc?rev=1156921&view=rev
Log:
- Added a netcat-style source and test for testing / debugging / playing around.

Added:
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/NetcatSource.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java

Added: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/NetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/NetcatSource.java?rev=1156921&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/NetcatSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/NetcatSource.java Fri Aug 12 00:49:00 2011
@@ -0,0 +1,115 @@
+package org.apache.flume.source;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.CharBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.flume.Context;
+import org.apache.flume.CounterGroup;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetcatSource extends AbstractEventSource {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(NetcatSource.class);
+
+  private int port;
+  private ServerSocketChannel serverSocket;
+  private CounterGroup counterGroup;
+
+  public NetcatSource() {
+    port = 0;
+    counterGroup = new CounterGroup();
+  }
+
+  @Override
+  public void open(Context context) {
+    counterGroup.incrementAndGet("open.attempts");
+
+    try {
+      SocketAddress bindPoint = new InetSocketAddress(port);
+
+      serverSocket = ServerSocketChannel.open();
+      serverSocket.socket().setReuseAddress(true);
+      serverSocket.socket().bind(bindPoint);
+
+      logger.info("Created serverSocket:{}", serverSocket);
+    } catch (IOException e) {
+      counterGroup.incrementAndGet("open.errors");
+      logger.error("Unable to bind to socket. Exception follows.", e);
+    }
+  }
+
+  @Override
+  public Event<?> next(Context context) throws InterruptedException,
+      EventDeliveryException {
+
+    Event<?> event = null;
+
+    counterGroup.incrementAndGet("next.calls");
+
+    try {
+      SocketChannel channel = serverSocket.accept();
+
+      logger.debug("Received a connection:{}", channel);
+
+      Reader reader = Channels.newReader(channel, "utf-8");
+      CharBuffer buffer = CharBuffer.allocate(512);
+      StringBuilder builder = new StringBuilder();
+
+      while (reader.read(buffer) != -1) {
+        buffer.flip();
+        logger.debug("read {} characters", buffer.remaining());
+        builder.append(buffer.array(), buffer.position(), buffer.length());
+      }
+
+      if (builder.charAt(builder.length() - 1) == '\n') {
+        builder.deleteCharAt(builder.length() - 1);
+      }
+
+      logger.debug("end of message");
+
+      event = EventBuilder.withBody(builder.toString());
+
+      channel.close();
+
+      counterGroup.incrementAndGet("events.success");
+    } catch (IOException e) {
+      counterGroup.incrementAndGet("events.failed");
+
+      throw new EventDeliveryException("Unable to process event due to "
+          + e.getMessage(), e);
+    }
+
+    return event;
+  }
+
+  @Override
+  public void close(Context context) {
+    if (serverSocket != null) {
+      try {
+        serverSocket.close();
+      } catch (IOException e) {
+        logger.error("Unable to close socket. Exception follows.", e);
+      }
+    }
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public void setPort(int port) {
+    this.port = port;
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1156921&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Fri Aug 12 00:49:00 2011
@@ -0,0 +1,82 @@
+package org.apache.flume.source;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.net.InetSocketAddress;
+import java.nio.channels.Channels;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.EventSource;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNetcatSource {
+
+  private EventSource source;
+
+  @Before
+  public void setUp() {
+    source = new NetcatSource();
+  }
+
+  @Test(timeout = 5000)
+  public void testLifecycle() throws InterruptedException, LifecycleException,
+      EventDeliveryException {
+
+    ExecutorService executor = Executors.newFixedThreadPool(3);
+    Context context = new Context();
+
+    /* FIXME: Use a random port for testing. */
+    ((NetcatSource) source).setPort(41414);
+
+    source.open(context);
+
+    Runnable clientRequestRunnable = new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          SocketChannel clientChannel = SocketChannel
+              .open(new InetSocketAddress(41414));
+
+          Writer writer = Channels.newWriter(clientChannel, "utf-8");
+
+          writer.write("Test message");
+
+          writer.flush();
+          clientChannel.close();
+        } catch (IOException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+
+    };
+
+    for (int i = 0; i < 100; i++) {
+      executor.submit(clientRequestRunnable);
+
+      Event<?> event = source.next(context);
+
+      Assert.assertNotNull(event);
+      Assert.assertEquals("Test message", event.getBody());
+    }
+
+    executor.shutdown();
+
+    while (!executor.isTerminated()) {
+      executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+    }
+
+    source.close(context);
+  }
+
+}