You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by jm...@apache.org on 2011/08/15 15:39:51 UTC

svn commit: r1157842 - in /incubator/flume/trunk: conf/ flume-core/src/main/java/com/cloudera/flume/conf/ flume-core/src/main/java/com/cloudera/flume/handlers/exec/ flume-core/src/test/java/com/cloudera/flume/handlers/exec/

Author: jmhsieh
Date: Mon Aug 15 13:39:51 2011
New Revision: 1157842

URL: http://svn.apache.org/viewvc?rev=1157842&view=rev
Log:
FLUME-699: Add exec source queue size parameter

Modified:
    incubator/flume/trunk/conf/flume-conf.xml
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/exec/ExecNioSource.java
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/exec/TestExecNioSource.java

Modified: incubator/flume/trunk/conf/flume-conf.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/conf/flume-conf.xml?rev=1157842&r1=1157841&r2=1157842&view=diff
==============================================================================
--- incubator/flume/trunk/conf/flume-conf.xml (original)
+++ incubator/flume/trunk/conf/flume-conf.xml Mon Aug 15 13:39:51 2011
@@ -326,5 +326,17 @@ configuration values placed in flume-sit
     client times out a connection</description>
   </property>
 
-  
+  <!-- ================================================== -->
+  <!-- Exec Source settings ============================= -->
+  <!-- ================================================== -->
+  <property>
+    <name>flume.exec.queuesize</name>
+    <value>1000</value>
+    <description>Max number of events queued up in execsource.  This
+    gives the user some control of over the amount of memory used by
+    the source.  If the max size is reached, the exec source wil
+    block.
+    </description>
+  </property>
+
 </configuration>

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java?rev=1157842&r1=1157841&r2=1157842&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java Mon Aug 15 13:39:51 2011
@@ -168,6 +168,7 @@ public class FlumeConfiguration extends 
   // Sink/source default options
   public static final String POLLER_QUEUESIZE = "flume.poller.queuesize";
   public static final String THRIFT_QUEUESIZE = "flume.thrift.queuesize";
+  public static final String EXEC_QUEUESIZE = "flume.exec.queuesize";
   public static final String THRIFT_CLOSE_MAX_SLEEP = "flume.thrift.close.maxsleep";
   public static final String THRIFT_SOCKET_TIMEOUT_MS = "flume.thrift.socket.timeout.ms";
   public static final String INSISTENTOPEN_INIT_BACKOFF = "flume.inisistentOpen.init.backoff";
@@ -557,6 +558,13 @@ public class FlumeConfiguration extends 
     return getInt(THRIFT_QUEUESIZE, 1000);
   }
 
+  /**
+   * Max queue size for an exec source.
+   */
+  public int getExecQueueSize() {
+    return getInt(EXEC_QUEUESIZE, 1000);
+  }
+
   public int getThriftSocketTimeoutMs() {
     return getInt(THRIFT_SOCKET_TIMEOUT_MS, 10000);
   }

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/exec/ExecNioSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/exec/ExecNioSource.java?rev=1157842&r1=1157841&r2=1157842&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/exec/ExecNioSource.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/exec/ExecNioSource.java Mon Aug 15 13:39:51 2011
@@ -66,8 +66,7 @@ public class ExecNioSource extends Event
   private final AtomicBoolean errFinished = new AtomicBoolean(false);
   private final AtomicBoolean outFinished = new AtomicBoolean(false);
 
-  private final BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<Event>(
-      1000);
+  private final BlockingQueue<Event> eventQueue;
 
   private static Logger LOG = LoggerFactory.getLogger(ExecNioSource.class);
 
@@ -101,7 +100,9 @@ public class ExecNioSource extends Event
    *          milliseconds to wait after exec exists before restarting if
    *          restart is true.
    */
-  ExecNioSource(String command, boolean aggregate, boolean restart, int period) {
+  ExecNioSource(String command, int qSz, boolean aggregate, boolean restart,
+      int period) {
+    eventQueue = new LinkedBlockingQueue<Event>(qSz);
     this.command = command;
     this.inAggregateMode = aggregate;
     this.restart = restart;
@@ -531,6 +532,7 @@ public class ExecNioSource extends Event
       boolean aggregate = false;
       boolean restart = false;
       int period = 0;
+      int qSz = FlumeConfiguration.get().getExecQueueSize();
       if (argv.length >= 2) {
         aggregate = Boolean.parseBoolean(argv[1]);
       }
@@ -540,7 +542,7 @@ public class ExecNioSource extends Event
       if (argv.length >= 4) {
         period = Integer.parseInt(argv[3]);
       }
-      return new ExecNioSource(command, aggregate, restart, period);
+      return new ExecNioSource(command, qSz, aggregate, restart, period);
     }
   }
 
@@ -560,7 +562,8 @@ public class ExecNioSource extends Event
         boolean aggregate = true;
         boolean restart = true;
         int period = Integer.parseInt(argv[1]);
-        return new ExecNioSource(command, aggregate, restart, period);
+        int qSz = FlumeConfiguration.get().getExecQueueSize();
+        return new ExecNioSource(command, qSz, aggregate, restart, period);
       }
     };
   }
@@ -581,7 +584,9 @@ public class ExecNioSource extends Event
         boolean aggregate = false;
         boolean restart = false;
         int period = 0;
-        return new ExecNioSource(command, aggregate, restart, period);
+        int qSz = FlumeConfiguration.get().getExecQueueSize();
+
+        return new ExecNioSource(command, qSz, aggregate, restart, period);
       }
     };
   }
@@ -589,4 +594,12 @@ public class ExecNioSource extends Event
   public static SourceBuilder builder() {
     return new Builder();
   }
+
+  /**
+   * Exposed for testing only
+   * @return
+   */
+  BlockingQueue<Event> getEventQueue() {
+    return eventQueue;
+  }
 }

Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/exec/TestExecNioSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/exec/TestExecNioSource.java?rev=1157842&r1=1157841&r2=1157842&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/exec/TestExecNioSource.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/exec/TestExecNioSource.java Mon Aug 15 13:39:51 2011
@@ -42,6 +42,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.cloudera.flume.conf.Context;
+import com.cloudera.flume.conf.FlumeBuilder;
 import com.cloudera.flume.conf.FlumeConfiguration;
 import com.cloudera.flume.conf.FlumeSpecException;
 import com.cloudera.flume.core.Event;
@@ -601,4 +603,12 @@ public class TestExecNioSource {
     source.close();
   }
 
+  @Test
+  public void testExecQueueSize() throws FlumeSpecException {
+    FlumeConfiguration.get().setInt(FlumeConfiguration.EXEC_QUEUESIZE, 1);
+    ExecNioSource src = (ExecNioSource) FlumeBuilder
+        .buildSource(new Context(), "exec(\"foo\")");
+    assertEquals(1, src.getEventQueue().remainingCapacity());
+  }
+
 }