You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/29 00:07:16 UTC
svn commit: r1190622 - in
/incubator/flume/branches/flume-728/flume-ng-core/src:
main/java/org/apache/flume/source/ExecSource.java
test/java/org/apache/flume/source/TestExecSource.java
Author: arvind
Date: Fri Oct 28 22:07:16 2011
New Revision: 1190622
URL: http://svn.apache.org/viewvc?rev=1190622&view=rev
Log:
FLUME-773. ExecSource does not rollback transactions on errors.
(Prasad Mujumdar via Arvind Prabhakar)
Added:
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java?rev=1190622&r1=1190621&r2=1190622&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java Fri Oct 28 22:07:16 2011
@@ -9,6 +9,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
@@ -171,29 +172,39 @@ public class ExecSource extends Abstract
@Override
public void run() {
+
+ Transaction transaction = null;
try {
String[] commandArgs = command.split("\\s+");
Process process = new ProcessBuilder(commandArgs).start();
BufferedReader reader = new BufferedReader(new InputStreamReader(
process.getInputStream()));
+ transaction = channel.getTransaction();
String line = null;
while ((line = reader.readLine()) != null) {
counterGroup.incrementAndGet("exec.lines.read");
- Transaction transaction = channel.getTransaction();
-
- transaction.begin();
- Event event = EventBuilder.withBody(line.getBytes());
- channel.put(event);
- transaction.commit();
+ try {
+ transaction.begin();
+ Event event = EventBuilder.withBody(line.getBytes());
+ channel.put(event);
+ transaction.commit();
+ } catch (ChannelException e) {
+ transaction.rollback();
+ throw e;
+ }
}
reader.close();
} catch (IOException e) {
logger.error("Failed while running command:{} - Exception follows.",
command, e);
+ } finally {
+ if (transaction != null) {
+ transaction.close();
+ }
}
}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java?rev=1190622&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java Fri Oct 28 22:07:16 2011
@@ -0,0 +1,68 @@
+package org.apache.flume.source;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.hadoop.fs.FileUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestExecSource {
+
+ private AbstractSource source;
+
+ @Before
+ public void setUp() {
+ source = new ExecSource();
+ }
+
+ @Test
+ public void testProcess() throws InterruptedException, LifecycleException,
+ EventDeliveryException, IOException {
+
+ Channel channel = new MemoryChannel();
+ Context context = new Context();
+
+ context.put("command", "cat /etc/passwd");
+ Configurables.configure(source, context);
+ Configurables.configure(channel, context);
+
+ source.setChannel(channel);
+ source.start();
+ Transaction transaction = channel.getTransaction();
+
+ transaction.begin();
+ Event event;
+ int numEvents = 0;
+
+ FileOutputStream outputStream = new FileOutputStream("/tmp/flume-execsource." + Thread.currentThread().getId());
+ while ((event = channel.take()) != null) {
+ outputStream.write(event.getBody());
+ outputStream.write('\n');
+ numEvents ++;
+ }
+ outputStream.close();
+ transaction.commit();
+ transaction.close();
+
+ source.stop();
+ File file1 = new File("/tmp/flume-execsource." + Thread.currentThread().getId());
+ File file2 = new File("/etc/passwd");
+ Assert.assertEquals(FileUtils.checksumCRC32(file1), FileUtils.checksumCRC32(file2));
+ FileUtils.forceDelete(file1);
+ }
+
+}