You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/10/20 03:47:19 UTC

svn commit: r1186600 - in /incubator/flume/branches/flume-728: flume-ng-core/src/test/java/org/apache/flume/sink/ flume-ng-core/src/test/java/org/apache/flume/source/ flume-ng-node/src/test/java/org/apache/flume/conf/file/ flume-ng-node/src/test/java/o...

Author: esammer
Date: Thu Oct 20 01:47:18 2011
New Revision: 1186600

URL: http://svn.apache.org/viewvc?rev=1186600&view=rev
Log:
FLUME-807: Fix tests broken by FLUME-802 changes

- Minor reformatting occurred.
- Updated a test to not reference any JUnit 3 APIs.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java?rev=1186600&r1=1186599&r2=1186600&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java Thu Oct 20 01:47:18 2011
@@ -46,8 +46,8 @@ public class TestAvroSink {
     Context context = new Context();
 
     context.put("hostname", "localhost");
-    context.put("port", 41414);
-    context.put("batch-size", 2);
+    context.put("port", "41414");
+    context.put("batch-size", "2");
 
     sink.setChannel(channel);
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java?rev=1186600&r1=1186599&r2=1186600&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java Thu Oct 20 01:47:18 2011
@@ -9,6 +9,7 @@ import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
+import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.PseudoTxnMemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.SimpleEvent;
@@ -34,6 +35,8 @@ public class TestRollingFileSink {
 
     sink = new RollingFileSink();
 
+    sink.setChannel(new MemoryChannel());
+
     tmpDir.mkdirs();
   }
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java?rev=1186600&r1=1186599&r2=1186600&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java Thu Oct 20 01:47:18 2011
@@ -52,7 +52,7 @@ public class TestAvroSource {
       try {
         Context context = new Context();
 
-        context.put("port", selectedPort = 41414 + i);
+        context.put("port", String.valueOf(selectedPort = 41414 + i));
         context.put("bind", "0.0.0.0");
 
         Configurables.configure(source, context);
@@ -88,7 +88,7 @@ public class TestAvroSource {
       try {
         Context context = new Context();
 
-        context.put("port", selectedPort = 41414 + i);
+        context.put("port", String.valueOf(selectedPort = 41414 + i));
         context.put("bind", "0.0.0.0");
 
         Configurables.configure(source, context);
@@ -121,7 +121,7 @@ public class TestAvroSource {
     Status status = client.append(avroEvent);
 
     Assert.assertEquals(Status.OK, status);
- 
+
     Transaction transaction = channel.getTransaction();
     transaction.begin();
 
@@ -132,7 +132,6 @@ public class TestAvroSource {
     transaction.commit();
     transaction.close();
 
-
     logger.debug("Round trip event:{}", event);
 
     source.stop();

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java?rev=1186600&r1=1186599&r2=1186600&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java Thu Oct 20 01:47:18 2011
@@ -53,6 +53,7 @@ public class TestJsonFileConfigurationPr
 
     provider = new JsonFileConfigurationProvider();
 
+    provider.setNodeName("localhost");
     provider.setChannelFactory(channelFactory);
     provider.setSourceFactory(sourceFactory);
     provider.setSinkFactory(sinkFactory);

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java?rev=1186600&r1=1186599&r2=1186600&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java Thu Oct 20 01:47:18 2011
@@ -1,14 +1,19 @@
 package org.apache.flume.node;
 
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Sink;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.Source;
+import org.apache.flume.SourceRunner;
 import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
 import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.node.nodemanager.AbstractLogicalNodeManager;
 import org.apache.flume.sink.NullSink;
-import org.apache.flume.sink.PollableSinkRunner;
-import org.apache.flume.source.PollableSourceRunner;
 import org.apache.flume.source.SequenceGeneratorSource;
 import org.junit.Assert;
 import org.junit.Before;
@@ -116,14 +121,17 @@ public class TestAbstractLogicalNodeMana
   @Test
   public void testLifecycle() throws LifecycleException, InterruptedException {
 
-    PollableSourceRunner sourceRunner = new PollableSourceRunner();
-    sourceRunner.setSource(new SequenceGeneratorSource());
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
 
-    PollableSinkRunner sinkRunner = new PollableSinkRunner();
-    sinkRunner.setSink(new NullSink());
+    Source generatorSource = new SequenceGeneratorSource();
+    generatorSource.setChannel(channel);
 
-    nodeManager.add(sourceRunner);
-    nodeManager.add(sinkRunner);
+    Sink nullSink = new NullSink();
+    nullSink.setChannel(channel);
+
+    nodeManager.add(SourceRunner.forSource(generatorSource));
+    nodeManager.add(SinkRunner.forSink(nullSink));
 
     nodeManager.start();
     boolean reached = LifecycleController.waitForOneOf(nodeManager,
@@ -144,17 +152,17 @@ public class TestAbstractLogicalNodeMana
   public void testRapidLifecycleFlapping() throws LifecycleException,
       InterruptedException {
 
-    SequenceGeneratorSource source = new SequenceGeneratorSource();
-    source.setChannel(new MemoryChannel());
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
 
-    PollableSourceRunner sourceRunner = new PollableSourceRunner();
-    sourceRunner.setSource(source);
+    Source source = new SequenceGeneratorSource();
+    source.setChannel(channel);
 
-    PollableSinkRunner sinkRunner = new PollableSinkRunner();
-    sinkRunner.setSink(new NullSink());
+    Sink sink = new NullSink();
+    sink.setChannel(channel);
 
-    nodeManager.add(sourceRunner);
-    nodeManager.add(sinkRunner);
+    nodeManager.add(SourceRunner.forSource(source));
+    nodeManager.add(SinkRunner.forSink(sink));
 
     for (int i = 0; i < 10; i++) {
       nodeManager.start();

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1186600&r1=1186599&r2=1186600&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Thu Oct 20 01:47:18 2011
@@ -33,7 +33,7 @@ public class TestNetcatSource {
     source = new NetcatSource();
 
     Context context = new Context();
-    context.put("capacity", 50);
+    context.put("capacity", "50");
 
     Configurables.configure(channel, context);
 
@@ -49,7 +49,7 @@ public class TestNetcatSource {
 
     /* FIXME: Use a random port for testing. */
     context.put("name", "test");
-    context.put("port", 41414);
+    context.put("port", "41414");
 
     Configurables.configure(source, context);
 

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json?rev=1186600&r1=1186599&r2=1186600&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json Thu Oct 20 01:47:18 2011
@@ -31,12 +31,12 @@
       {
         "name": "ch1",
         "type": "memory",
-        "capacity": 100
+        "capacity": "100"
       },
       {
         "name": "ch2",
         "type": "memory",
-        "capacity": 10
+        "capacity": "10"
       }
     ]
 
@@ -48,7 +48,7 @@
       {
         "name": "netcat 1",
         "type": "netcat",
-        "port": 41414,
+        "port": "41414",
         "channel": "ch1"
       }
     ],

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java?rev=1186600&r1=1186599&r2=1186600&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java Thu Oct 20 01:47:18 2011
@@ -22,8 +22,6 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.Calendar;
 
-import junit.framework.Assert;
-
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -33,7 +31,6 @@ import org.apache.flume.channel.MemoryCh
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.SimpleEvent;
 import org.apache.flume.lifecycle.LifecycleException;
-import org.apache.flume.sink.hdfs.HDFSEventSink;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -43,20 +40,19 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.flume.sink.hdfs.HDFSBadWriterFactory;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import junit.framework.TestCase;
-import junit.framework.TestResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TestHDFSEventSink extends TestCase {
+public class TestHDFSEventSink {
 
   private HDFSEventSink sink;
   private String testPath;
-  private static final Logger LOG = LoggerFactory.getLogger(HDFSEventSink.class);
+  private static final Logger LOG = LoggerFactory
+      .getLogger(HDFSEventSink.class);
 
   private void dirCleanup() {
     Configuration conf = new Configuration();
@@ -78,8 +74,9 @@ public class TestHDFSEventSink extends T
      * Hadoop config points at file:/// rather than hdfs://. We need to find a
      * better way of testing HDFS related functionality.
      */
-    testPath = "file:///tmp/fluem-test." + Calendar.getInstance().getTimeInMillis() 
-        + "." + Thread.currentThread().getId();
+    testPath = "file:///tmp/fluem-test."
+        + Calendar.getInstance().getTimeInMillis() + "."
+        + Thread.currentThread().getId();
 
     sink = new HDFSEventSink();
     dirCleanup();
@@ -87,7 +84,7 @@ public class TestHDFSEventSink extends T
 
   @After
   public void tearDown() {
-    if( System.getenv("hdfs_keepFiles") == null)
+    if (System.getenv("hdfs_keepFiles") == null)
       dirCleanup();
   }
 
@@ -103,6 +100,8 @@ public class TestHDFSEventSink extends T
      */
     Configurables.configure(sink, context);
 
+    sink.setChannel(new MemoryChannel());
+
     sink.start();
     sink.stop();
   }
@@ -117,7 +116,7 @@ public class TestHDFSEventSink extends T
     final String fileName = "FlumeData";
     String newPath = testPath + "/singleTextBucket";
     int totalEvents = 0;
-    int i=1,j=1;
+    int i = 1, j = 1;
 
     // clear the test directory
     Configuration conf = new Configuration();
@@ -128,13 +127,13 @@ public class TestHDFSEventSink extends T
 
     Context context = new Context();
 
-//    context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
+    // context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
     context.put("hdfs.path", newPath);
     context.put("hdfs.filePrefix", fileName);
     context.put("hdfs.txnEventMax", String.valueOf(txnMax));
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
-    context.put("hdfs.writeFormat","Text");
+    context.put("hdfs.writeFormat", "Text");
     context.put("hdfs.fileType", "DataStream");
 
     Configurables.configure(sink, context);
@@ -178,35 +177,35 @@ public class TestHDFSEventSink extends T
 
     // check that the roll happened correctly for the given data
     // Note that we'll end up with one last file with only header
-    Assert.assertEquals((totalEvents/rollCount) + 1, fList.length);
+    Assert.assertEquals((totalEvents / rollCount) + 1, fList.length);
 
     try {
       i = j = 1;
       for (int cnt = 0; cnt < fList.length - 1; cnt++) {
-        Path filePath = new Path(newPath + "/" + fileName +"." + cnt);
+        Path filePath = new Path(newPath + "/" + fileName + "." + cnt);
         FSDataInputStream input = fs.open(filePath);
         BufferedReader d = new BufferedReader(new InputStreamReader(input));
         String line;
 
-        while ((line =  d.readLine()) != null) {
+        while ((line = d.readLine()) != null) {
           Assert.assertEquals(line, ("Test." + i + "." + j));
-          if ( ++j > txnMax) {
+          if (++j > txnMax) {
             j = 1;
             i++;
           }
         }
-        input.close(); 
-       }
+        input.close();
+      }
     } catch (IOException ioe) {
       System.err.println("IOException during operation: " + ioe.toString());
-      return; 
-    } 
-    Assert.assertEquals(i, 4);    
+      return;
+    }
+    Assert.assertEquals(i, 4);
   }
 
   @Test
-  public void testSimpleAppend() throws InterruptedException, LifecycleException,
-      EventDeliveryException, IOException {
+  public void testSimpleAppend() throws InterruptedException,
+      LifecycleException, EventDeliveryException, IOException {
 
     final long txnMax = 25;
     final String fileName = "FlumeData";
@@ -215,7 +214,7 @@ public class TestHDFSEventSink extends T
     final int numBatches = 4;
     String newPath = testPath + "/singleBucket";
     int totalEvents = 0;
-    int i=1,j=1;
+    int i = 1, j = 1;
 
     // clear the test directory
     Configuration conf = new Configuration();
@@ -255,7 +254,7 @@ public class TestHDFSEventSink extends T
 
         event.setBody(("Test." + i + "." + j).getBytes());
         channel.put(event);
-        totalEvents ++;
+        totalEvents++;
       }
       txn.commit();
       txn.close();
@@ -266,40 +265,39 @@ public class TestHDFSEventSink extends T
 
     sink.stop();
 
-
     // loop through all the files generated and check their contains
     FileStatus[] dirStat = fs.listStatus(dirPath);
     Path fList[] = FileUtil.stat2Paths(dirStat);
 
     // check that the roll happened correctly for the given data
     // Note that we'll end up with one last file with only header
-    Assert.assertEquals((totalEvents/rollCount) + 1, fList.length);
+    Assert.assertEquals((totalEvents / rollCount) + 1, fList.length);
 
     try {
       i = j = 1;
       for (int cnt = 0; cnt < fList.length - 1; cnt++) {
-        Path filePath = new Path(newPath + "/" + fileName +"." + cnt);
+        Path filePath = new Path(newPath + "/" + fileName + "." + cnt);
         SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
-        LongWritable key = new LongWritable(); 
+        LongWritable key = new LongWritable();
         BytesWritable value = new BytesWritable();
         BytesWritable expValue;
 
         while (reader.next(key, value)) {
-          expValue =  new BytesWritable(("Test." + i + "." + j).getBytes());
+          expValue = new BytesWritable(("Test." + i + "." + j).getBytes());
           Assert.assertEquals(expValue, value);
-          if ( ++j > txnMax) {
+          if (++j > txnMax) {
             j = 1;
             i++;
           }
         }
-        reader.close(); 
-       } 
+        reader.close();
+      }
     } catch (IOException ioe) {
       System.err.println("IOException during operation: " + ioe.toString());
-      System.exit(1); 
-    } 
-    Assert.assertEquals(i, 4);    
-    
+      System.exit(1);
+    }
+    Assert.assertEquals(i, 4);
+
   }
 
   @Test
@@ -377,11 +375,10 @@ public class TestHDFSEventSink extends T
      */
   }
 
-
   // inject fault and make sure that the txn is rolled back and retried
   @Test
-  public void testBadSimpleAppend() throws InterruptedException, LifecycleException,
-      EventDeliveryException, IOException {
+  public void testBadSimpleAppend() throws InterruptedException,
+      LifecycleException, EventDeliveryException, IOException {
 
     final long txnMax = 25;
     final String fileName = "FlumeData";
@@ -390,7 +387,7 @@ public class TestHDFSEventSink extends T
     final int numBatches = 4;
     String newPath = testPath + "/singleBucket";
     int totalEvents = 0;
-    int i=1,j=1;
+    int i = 1, j = 1;
 
     HDFSBadWriterFactory badWriterFactory = new HDFSBadWriterFactory();
     sink = new HDFSEventSink(badWriterFactory);
@@ -438,7 +435,7 @@ public class TestHDFSEventSink extends T
           event.getHeaders().put("fault-once", "");
         }
         channel.put(event);
-        totalEvents ++;
+        totalEvents++;
       }
       txn.commit();
       txn.close();