You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/29 00:07:16 UTC

svn commit: r1190622 - in /incubator/flume/branches/flume-728/flume-ng-core/src: main/java/org/apache/flume/source/ExecSource.java test/java/org/apache/flume/source/TestExecSource.java

Author: arvind
Date: Fri Oct 28 22:07:16 2011
New Revision: 1190622

URL: http://svn.apache.org/viewvc?rev=1190622&view=rev
Log:
FLUME-773. ExecSource does not rollback transactions on errors.

(Prasad Mujumdar via Arvind Prabhakar)

Added:
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java?rev=1190622&r1=1190621&r2=1190622&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java Fri Oct 28 22:07:16 2011
@@ -9,6 +9,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
@@ -171,29 +172,39 @@ public class ExecSource extends Abstract
 
     @Override
     public void run() {
+      
+      Transaction transaction = null;
       try {
         String[] commandArgs = command.split("\\s+");
         Process process = new ProcessBuilder(commandArgs).start();
         BufferedReader reader = new BufferedReader(new InputStreamReader(
             process.getInputStream()));
+        transaction = channel.getTransaction();
 
         String line = null;
 
         while ((line = reader.readLine()) != null) {
           counterGroup.incrementAndGet("exec.lines.read");
 
-          Transaction transaction = channel.getTransaction();
-
-          transaction.begin();
-          Event event = EventBuilder.withBody(line.getBytes());
-          channel.put(event);
-          transaction.commit();
+          try {
+            transaction.begin();
+            Event event = EventBuilder.withBody(line.getBytes());
+            channel.put(event);
+            transaction.commit();
+          } catch (ChannelException e) {
+            transaction.rollback();
+            throw e;
+          } 
         }
 
         reader.close();
       } catch (IOException e) {
         logger.error("Failed while running command:{} - Exception follows.",
             command, e);
+      } finally {
+        if (transaction != null) {
+          transaction.close();
+        }
       }
     }
 

Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java?rev=1190622&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java Fri Oct 28 22:07:16 2011
@@ -0,0 +1,68 @@
+package org.apache.flume.source;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.hadoop.fs.FileUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestExecSource {
+
+  private AbstractSource source;
+
+  @Before
+  public void setUp() {
+    source = new ExecSource();
+  }
+
+  @Test
+  public void testProcess() throws InterruptedException, LifecycleException,
+      EventDeliveryException, IOException {
+
+    Channel channel = new MemoryChannel();
+    Context context = new Context();
+
+    context.put("command", "cat /etc/passwd");
+    Configurables.configure(source, context);
+    Configurables.configure(channel, context);
+
+    source.setChannel(channel);
+    source.start();
+    Transaction transaction = channel.getTransaction();
+    
+    transaction.begin();
+    Event event;
+    int numEvents = 0;
+    
+    FileOutputStream outputStream = new FileOutputStream("/tmp/flume-execsource." + Thread.currentThread().getId());
+    while ((event = channel.take()) != null) {
+      outputStream.write(event.getBody());
+      outputStream.write('\n');
+      numEvents ++;
+    }
+    outputStream.close();
+    transaction.commit();
+    transaction.close();
+    
+    source.stop();
+    File file1 = new File("/tmp/flume-execsource." + Thread.currentThread().getId());
+    File file2 = new File("/etc/passwd");
+    Assert.assertEquals(FileUtils.checksumCRC32(file1), FileUtils.checksumCRC32(file2));
+    FileUtils.forceDelete(file1);
+  }
+
+}