You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/25 20:50:15 UTC

svn commit: r1161691 - /incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java

Author: esammer
Date: Thu Aug 25 18:50:14 2011
New Revision: 1161691

URL: http://svn.apache.org/viewvc?rev=1161691&view=rev
Log:
- Added / improved tests of NetcatSource including configuration via Configurable and
  the new WAL support. This is a work in progress.

Modified:
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1161691&r1=1161690&r2=1161691&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Thu Aug 25 18:50:14 2011
@@ -1,5 +1,6 @@
 package org.apache.flume.source;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.Writer;
 import java.net.InetSocketAddress;
@@ -9,10 +10,13 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.EventSource;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.durability.file.FileBasedWALManager;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.junit.Assert;
 import org.junit.Before;
@@ -35,8 +39,75 @@ public class TestNetcatSource {
     Context context = new Context();
 
     /* FIXME: Use a random port for testing. */
+    context.put("logicalNode.name", "test");
+    context.put("source.port", 41414);
+
+    Configurables.configure(source, context);
+
+    source.open(context);
+
+    Runnable clientRequestRunnable = new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          SocketChannel clientChannel = SocketChannel
+              .open(new InetSocketAddress(41414));
+
+          Writer writer = Channels.newWriter(clientChannel, "utf-8");
+
+          writer.write("Test message");
+
+          writer.flush();
+          clientChannel.close();
+        } catch (IOException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+
+    };
+
+    for (int i = 0; i < 100; i++) {
+      executor.submit(clientRequestRunnable);
+
+      Event event = source.next(context);
+
+      Assert.assertNotNull(event);
+      Assert.assertArrayEquals("Test message".getBytes(), event.getBody());
+    }
+
+    executor.shutdown();
+
+    while (!executor.isTerminated()) {
+      executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+    }
+
+    source.close(context);
+  }
+
+  @Test
+  public void testDurability() throws InterruptedException, LifecycleException,
+      EventDeliveryException, IOException {
+
+    /* FIXME: Use a random port for testing. */
     ((NetcatSource) source).setPort(41414);
 
+    FileBasedWALManager walManager = new FileBasedWALManager();
+
+    walManager.setDirectory(new File("/tmp/flume-ncs-tests", "wal-test"));
+    walManager.getDirectory().mkdirs();
+
+    ((NetcatSource) source).setWALManager(walManager);
+
+    ExecutorService executor = Executors.newFixedThreadPool(3);
+    Context context = new Context();
+
+    context.put("logicalNode.name", "test");
+    context.put("source.port", 41414);
+
+    Configurables.configure(source, context);
+
     source.open(context);
 
     Runnable clientRequestRunnable = new Runnable() {
@@ -77,6 +148,11 @@ public class TestNetcatSource {
     }
 
     source.close(context);
+
+    FileUtils.deleteDirectory(walManager.getDirectory());
+
+    /* Only delete the parent if it's empty. */
+    walManager.getDirectory().getParentFile().delete();
   }
 
 }