You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/10/20 03:47:19 UTC
svn commit: r1186600 - in /incubator/flume/branches/flume-728:
flume-ng-core/src/test/java/org/apache/flume/sink/
flume-ng-core/src/test/java/org/apache/flume/source/
flume-ng-node/src/test/java/org/apache/flume/conf/file/
flume-ng-node/src/test/java/o...
Author: esammer
Date: Thu Oct 20 01:47:18 2011
New Revision: 1186600
URL: http://svn.apache.org/viewvc?rev=1186600&view=rev
Log:
FLUME-807: Fix tests broken by FLUME-802 changes
- Minor reformatting occurred.
- Updated a test to not reference any JUnit 3 APIs.
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json
incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java?rev=1186600&r1=1186599&r2=1186600&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java Thu Oct 20 01:47:18 2011
@@ -46,8 +46,8 @@ public class TestAvroSink {
Context context = new Context();
context.put("hostname", "localhost");
- context.put("port", 41414);
- context.put("batch-size", 2);
+ context.put("port", "41414");
+ context.put("batch-size", "2");
sink.setChannel(channel);
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java?rev=1186600&r1=1186599&r2=1186600&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java Thu Oct 20 01:47:18 2011
@@ -9,6 +9,7 @@ import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
+import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.PseudoTxnMemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.SimpleEvent;
@@ -34,6 +35,8 @@ public class TestRollingFileSink {
sink = new RollingFileSink();
+ sink.setChannel(new MemoryChannel());
+
tmpDir.mkdirs();
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java?rev=1186600&r1=1186599&r2=1186600&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java Thu Oct 20 01:47:18 2011
@@ -52,7 +52,7 @@ public class TestAvroSource {
try {
Context context = new Context();
- context.put("port", selectedPort = 41414 + i);
+ context.put("port", String.valueOf(selectedPort = 41414 + i));
context.put("bind", "0.0.0.0");
Configurables.configure(source, context);
@@ -88,7 +88,7 @@ public class TestAvroSource {
try {
Context context = new Context();
- context.put("port", selectedPort = 41414 + i);
+ context.put("port", String.valueOf(selectedPort = 41414 + i));
context.put("bind", "0.0.0.0");
Configurables.configure(source, context);
@@ -121,7 +121,7 @@ public class TestAvroSource {
Status status = client.append(avroEvent);
Assert.assertEquals(Status.OK, status);
-
+
Transaction transaction = channel.getTransaction();
transaction.begin();
@@ -132,7 +132,6 @@ public class TestAvroSource {
transaction.commit();
transaction.close();
-
logger.debug("Round trip event:{}", event);
source.stop();
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java?rev=1186600&r1=1186599&r2=1186600&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java Thu Oct 20 01:47:18 2011
@@ -53,6 +53,7 @@ public class TestJsonFileConfigurationPr
provider = new JsonFileConfigurationProvider();
+ provider.setNodeName("localhost");
provider.setChannelFactory(channelFactory);
provider.setSourceFactory(sourceFactory);
provider.setSinkFactory(sinkFactory);
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java?rev=1186600&r1=1186599&r2=1186600&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java Thu Oct 20 01:47:18 2011
@@ -1,14 +1,19 @@
package org.apache.flume.node;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Sink;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.Source;
+import org.apache.flume.SourceRunner;
import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleController;
import org.apache.flume.lifecycle.LifecycleException;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.node.nodemanager.AbstractLogicalNodeManager;
import org.apache.flume.sink.NullSink;
-import org.apache.flume.sink.PollableSinkRunner;
-import org.apache.flume.source.PollableSourceRunner;
import org.apache.flume.source.SequenceGeneratorSource;
import org.junit.Assert;
import org.junit.Before;
@@ -116,14 +121,17 @@ public class TestAbstractLogicalNodeMana
@Test
public void testLifecycle() throws LifecycleException, InterruptedException {
- PollableSourceRunner sourceRunner = new PollableSourceRunner();
- sourceRunner.setSource(new SequenceGeneratorSource());
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, new Context());
- PollableSinkRunner sinkRunner = new PollableSinkRunner();
- sinkRunner.setSink(new NullSink());
+ Source generatorSource = new SequenceGeneratorSource();
+ generatorSource.setChannel(channel);
- nodeManager.add(sourceRunner);
- nodeManager.add(sinkRunner);
+ Sink nullSink = new NullSink();
+ nullSink.setChannel(channel);
+
+ nodeManager.add(SourceRunner.forSource(generatorSource));
+ nodeManager.add(SinkRunner.forSink(nullSink));
nodeManager.start();
boolean reached = LifecycleController.waitForOneOf(nodeManager,
@@ -144,17 +152,17 @@ public class TestAbstractLogicalNodeMana
public void testRapidLifecycleFlapping() throws LifecycleException,
InterruptedException {
- SequenceGeneratorSource source = new SequenceGeneratorSource();
- source.setChannel(new MemoryChannel());
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, new Context());
- PollableSourceRunner sourceRunner = new PollableSourceRunner();
- sourceRunner.setSource(source);
+ Source source = new SequenceGeneratorSource();
+ source.setChannel(channel);
- PollableSinkRunner sinkRunner = new PollableSinkRunner();
- sinkRunner.setSink(new NullSink());
+ Sink sink = new NullSink();
+ sink.setChannel(channel);
- nodeManager.add(sourceRunner);
- nodeManager.add(sinkRunner);
+ nodeManager.add(SourceRunner.forSource(source));
+ nodeManager.add(SinkRunner.forSink(sink));
for (int i = 0; i < 10; i++) {
nodeManager.start();
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1186600&r1=1186599&r2=1186600&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Thu Oct 20 01:47:18 2011
@@ -33,7 +33,7 @@ public class TestNetcatSource {
source = new NetcatSource();
Context context = new Context();
- context.put("capacity", 50);
+ context.put("capacity", "50");
Configurables.configure(channel, context);
@@ -49,7 +49,7 @@ public class TestNetcatSource {
/* FIXME: Use a random port for testing. */
context.put("name", "test");
- context.put("port", 41414);
+ context.put("port", "41414");
Configurables.configure(source, context);
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json?rev=1186600&r1=1186599&r2=1186600&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json Thu Oct 20 01:47:18 2011
@@ -31,12 +31,12 @@
{
"name": "ch1",
"type": "memory",
- "capacity": 100
+ "capacity": "100"
},
{
"name": "ch2",
"type": "memory",
- "capacity": 10
+ "capacity": "10"
}
]
@@ -48,7 +48,7 @@
{
"name": "netcat 1",
"type": "netcat",
- "port": 41414,
+ "port": "41414",
"channel": "ch1"
}
],
Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java?rev=1186600&r1=1186599&r2=1186600&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java Thu Oct 20 01:47:18 2011
@@ -22,8 +22,6 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Calendar;
-import junit.framework.Assert;
-
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -33,7 +31,6 @@ import org.apache.flume.channel.MemoryCh
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.lifecycle.LifecycleException;
-import org.apache.flume.sink.hdfs.HDFSEventSink;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -43,20 +40,19 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.flume.sink.hdfs.HDFSBadWriterFactory;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import junit.framework.TestCase;
-import junit.framework.TestResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TestHDFSEventSink extends TestCase {
+public class TestHDFSEventSink {
private HDFSEventSink sink;
private String testPath;
- private static final Logger LOG = LoggerFactory.getLogger(HDFSEventSink.class);
+ private static final Logger LOG = LoggerFactory
+ .getLogger(HDFSEventSink.class);
private void dirCleanup() {
Configuration conf = new Configuration();
@@ -78,8 +74,9 @@ public class TestHDFSEventSink extends T
* Hadoop config points at file:/// rather than hdfs://. We need to find a
* better way of testing HDFS related functionality.
*/
- testPath = "file:///tmp/fluem-test." + Calendar.getInstance().getTimeInMillis()
- + "." + Thread.currentThread().getId();
+ testPath = "file:///tmp/fluem-test."
+ + Calendar.getInstance().getTimeInMillis() + "."
+ + Thread.currentThread().getId();
sink = new HDFSEventSink();
dirCleanup();
@@ -87,7 +84,7 @@ public class TestHDFSEventSink extends T
@After
public void tearDown() {
- if( System.getenv("hdfs_keepFiles") == null)
+ if (System.getenv("hdfs_keepFiles") == null)
dirCleanup();
}
@@ -103,6 +100,8 @@ public class TestHDFSEventSink extends T
*/
Configurables.configure(sink, context);
+ sink.setChannel(new MemoryChannel());
+
sink.start();
sink.stop();
}
@@ -117,7 +116,7 @@ public class TestHDFSEventSink extends T
final String fileName = "FlumeData";
String newPath = testPath + "/singleTextBucket";
int totalEvents = 0;
- int i=1,j=1;
+ int i = 1, j = 1;
// clear the test directory
Configuration conf = new Configuration();
@@ -128,13 +127,13 @@ public class TestHDFSEventSink extends T
Context context = new Context();
-// context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
+ // context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
context.put("hdfs.path", newPath);
context.put("hdfs.filePrefix", fileName);
context.put("hdfs.txnEventMax", String.valueOf(txnMax));
context.put("hdfs.rollCount", String.valueOf(rollCount));
context.put("hdfs.batchSize", String.valueOf(batchSize));
- context.put("hdfs.writeFormat","Text");
+ context.put("hdfs.writeFormat", "Text");
context.put("hdfs.fileType", "DataStream");
Configurables.configure(sink, context);
@@ -178,35 +177,35 @@ public class TestHDFSEventSink extends T
// check that the roll happened correctly for the given data
// Note that we'll end up with one last file with only header
- Assert.assertEquals((totalEvents/rollCount) + 1, fList.length);
+ Assert.assertEquals((totalEvents / rollCount) + 1, fList.length);
try {
i = j = 1;
for (int cnt = 0; cnt < fList.length - 1; cnt++) {
- Path filePath = new Path(newPath + "/" + fileName +"." + cnt);
+ Path filePath = new Path(newPath + "/" + fileName + "." + cnt);
FSDataInputStream input = fs.open(filePath);
BufferedReader d = new BufferedReader(new InputStreamReader(input));
String line;
- while ((line = d.readLine()) != null) {
+ while ((line = d.readLine()) != null) {
Assert.assertEquals(line, ("Test." + i + "." + j));
- if ( ++j > txnMax) {
+ if (++j > txnMax) {
j = 1;
i++;
}
}
- input.close();
- }
+ input.close();
+ }
} catch (IOException ioe) {
System.err.println("IOException during operation: " + ioe.toString());
- return;
- }
- Assert.assertEquals(i, 4);
+ return;
+ }
+ Assert.assertEquals(i, 4);
}
@Test
- public void testSimpleAppend() throws InterruptedException, LifecycleException,
- EventDeliveryException, IOException {
+ public void testSimpleAppend() throws InterruptedException,
+ LifecycleException, EventDeliveryException, IOException {
final long txnMax = 25;
final String fileName = "FlumeData";
@@ -215,7 +214,7 @@ public class TestHDFSEventSink extends T
final int numBatches = 4;
String newPath = testPath + "/singleBucket";
int totalEvents = 0;
- int i=1,j=1;
+ int i = 1, j = 1;
// clear the test directory
Configuration conf = new Configuration();
@@ -255,7 +254,7 @@ public class TestHDFSEventSink extends T
event.setBody(("Test." + i + "." + j).getBytes());
channel.put(event);
- totalEvents ++;
+ totalEvents++;
}
txn.commit();
txn.close();
@@ -266,40 +265,39 @@ public class TestHDFSEventSink extends T
sink.stop();
-
// loop through all the files generated and check their contains
FileStatus[] dirStat = fs.listStatus(dirPath);
Path fList[] = FileUtil.stat2Paths(dirStat);
// check that the roll happened correctly for the given data
// Note that we'll end up with one last file with only header
- Assert.assertEquals((totalEvents/rollCount) + 1, fList.length);
+ Assert.assertEquals((totalEvents / rollCount) + 1, fList.length);
try {
i = j = 1;
for (int cnt = 0; cnt < fList.length - 1; cnt++) {
- Path filePath = new Path(newPath + "/" + fileName +"." + cnt);
+ Path filePath = new Path(newPath + "/" + fileName + "." + cnt);
SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
- LongWritable key = new LongWritable();
+ LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
BytesWritable expValue;
while (reader.next(key, value)) {
- expValue = new BytesWritable(("Test." + i + "." + j).getBytes());
+ expValue = new BytesWritable(("Test." + i + "." + j).getBytes());
Assert.assertEquals(expValue, value);
- if ( ++j > txnMax) {
+ if (++j > txnMax) {
j = 1;
i++;
}
}
- reader.close();
- }
+ reader.close();
+ }
} catch (IOException ioe) {
System.err.println("IOException during operation: " + ioe.toString());
- System.exit(1);
- }
- Assert.assertEquals(i, 4);
-
+ System.exit(1);
+ }
+ Assert.assertEquals(i, 4);
+
}
@Test
@@ -377,11 +375,10 @@ public class TestHDFSEventSink extends T
*/
}
-
// inject fault and make sure that the txn is rolled back and retried
@Test
- public void testBadSimpleAppend() throws InterruptedException, LifecycleException,
- EventDeliveryException, IOException {
+ public void testBadSimpleAppend() throws InterruptedException,
+ LifecycleException, EventDeliveryException, IOException {
final long txnMax = 25;
final String fileName = "FlumeData";
@@ -390,7 +387,7 @@ public class TestHDFSEventSink extends T
final int numBatches = 4;
String newPath = testPath + "/singleBucket";
int totalEvents = 0;
- int i=1,j=1;
+ int i = 1, j = 1;
HDFSBadWriterFactory badWriterFactory = new HDFSBadWriterFactory();
sink = new HDFSEventSink(badWriterFactory);
@@ -438,7 +435,7 @@ public class TestHDFSEventSink extends T
event.getHeaders().put("fault-once", "");
}
channel.put(event);
- totalEvents ++;
+ totalEvents++;
}
txn.commit();
txn.close();