You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by jm...@apache.org on 2011/08/15 15:39:51 UTC
svn commit: r1157842 - in /incubator/flume/trunk: conf/
flume-core/src/main/java/com/cloudera/flume/conf/
flume-core/src/main/java/com/cloudera/flume/handlers/exec/
flume-core/src/test/java/com/cloudera/flume/handlers/exec/
Author: jmhsieh
Date: Mon Aug 15 13:39:51 2011
New Revision: 1157842
URL: http://svn.apache.org/viewvc?rev=1157842&view=rev
Log:
FLUME-699: Add exec source queue size parameter
Modified:
incubator/flume/trunk/conf/flume-conf.xml
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/exec/ExecNioSource.java
incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/exec/TestExecNioSource.java
Modified: incubator/flume/trunk/conf/flume-conf.xml
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/conf/flume-conf.xml?rev=1157842&r1=1157841&r2=1157842&view=diff
==============================================================================
--- incubator/flume/trunk/conf/flume-conf.xml (original)
+++ incubator/flume/trunk/conf/flume-conf.xml Mon Aug 15 13:39:51 2011
@@ -326,5 +326,17 @@ configuration values placed in flume-sit
client times out a connection</description>
</property>
-
+ <!-- ================================================== -->
+ <!-- Exec Source settings ============================= -->
+ <!-- ================================================== -->
+ <property>
+ <name>flume.exec.queuesize</name>
+ <value>1000</value>
+ <description>Max number of events queued up in execsource. This
+ gives the user some control of over the amount of memory used by
+ the source. If the max size is reached, the exec source wil
+ block.
+ </description>
+ </property>
+
</configuration>
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java?rev=1157842&r1=1157841&r2=1157842&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java Mon Aug 15 13:39:51 2011
@@ -168,6 +168,7 @@ public class FlumeConfiguration extends
// Sink/source default options
public static final String POLLER_QUEUESIZE = "flume.poller.queuesize";
public static final String THRIFT_QUEUESIZE = "flume.thrift.queuesize";
+ public static final String EXEC_QUEUESIZE = "flume.exec.queuesize";
public static final String THRIFT_CLOSE_MAX_SLEEP = "flume.thrift.close.maxsleep";
public static final String THRIFT_SOCKET_TIMEOUT_MS = "flume.thrift.socket.timeout.ms";
public static final String INSISTENTOPEN_INIT_BACKOFF = "flume.inisistentOpen.init.backoff";
@@ -557,6 +558,13 @@ public class FlumeConfiguration extends
return getInt(THRIFT_QUEUESIZE, 1000);
}
+ /**
+ * Max queue size for an exec source.
+ */
+ public int getExecQueueSize() {
+ return getInt(EXEC_QUEUESIZE, 1000);
+ }
+
public int getThriftSocketTimeoutMs() {
return getInt(THRIFT_SOCKET_TIMEOUT_MS, 10000);
}
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/exec/ExecNioSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/exec/ExecNioSource.java?rev=1157842&r1=1157841&r2=1157842&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/exec/ExecNioSource.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/handlers/exec/ExecNioSource.java Mon Aug 15 13:39:51 2011
@@ -66,8 +66,7 @@ public class ExecNioSource extends Event
private final AtomicBoolean errFinished = new AtomicBoolean(false);
private final AtomicBoolean outFinished = new AtomicBoolean(false);
- private final BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<Event>(
- 1000);
+ private final BlockingQueue<Event> eventQueue;
private static Logger LOG = LoggerFactory.getLogger(ExecNioSource.class);
@@ -101,7 +100,9 @@ public class ExecNioSource extends Event
* milliseconds to wait after exec exists before restarting if
* restart is true.
*/
- ExecNioSource(String command, boolean aggregate, boolean restart, int period) {
+ ExecNioSource(String command, int qSz, boolean aggregate, boolean restart,
+ int period) {
+ eventQueue = new LinkedBlockingQueue<Event>(qSz);
this.command = command;
this.inAggregateMode = aggregate;
this.restart = restart;
@@ -531,6 +532,7 @@ public class ExecNioSource extends Event
boolean aggregate = false;
boolean restart = false;
int period = 0;
+ int qSz = FlumeConfiguration.get().getExecQueueSize();
if (argv.length >= 2) {
aggregate = Boolean.parseBoolean(argv[1]);
}
@@ -540,7 +542,7 @@ public class ExecNioSource extends Event
if (argv.length >= 4) {
period = Integer.parseInt(argv[3]);
}
- return new ExecNioSource(command, aggregate, restart, period);
+ return new ExecNioSource(command, qSz, aggregate, restart, period);
}
}
@@ -560,7 +562,8 @@ public class ExecNioSource extends Event
boolean aggregate = true;
boolean restart = true;
int period = Integer.parseInt(argv[1]);
- return new ExecNioSource(command, aggregate, restart, period);
+ int qSz = FlumeConfiguration.get().getExecQueueSize();
+ return new ExecNioSource(command, qSz, aggregate, restart, period);
}
};
}
@@ -581,7 +584,9 @@ public class ExecNioSource extends Event
boolean aggregate = false;
boolean restart = false;
int period = 0;
- return new ExecNioSource(command, aggregate, restart, period);
+ int qSz = FlumeConfiguration.get().getExecQueueSize();
+
+ return new ExecNioSource(command, qSz, aggregate, restart, period);
}
};
}
@@ -589,4 +594,12 @@ public class ExecNioSource extends Event
public static SourceBuilder builder() {
return new Builder();
}
+
+ /**
+ * Exposed for testing only
+ * @return
+ */
+ BlockingQueue<Event> getEventQueue() {
+ return eventQueue;
+ }
}
Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/exec/TestExecNioSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/exec/TestExecNioSource.java?rev=1157842&r1=1157841&r2=1157842&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/exec/TestExecNioSource.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/handlers/exec/TestExecNioSource.java Mon Aug 15 13:39:51 2011
@@ -42,6 +42,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.cloudera.flume.conf.Context;
+import com.cloudera.flume.conf.FlumeBuilder;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.core.Event;
@@ -601,4 +603,12 @@ public class TestExecNioSource {
source.close();
}
+ @Test
+ public void testExecQueueSize() throws FlumeSpecException {
+ FlumeConfiguration.get().setInt(FlumeConfiguration.EXEC_QUEUESIZE, 1);
+ ExecNioSource src = (ExecNioSource) FlumeBuilder
+ .buildSource(new Context(), "exec(\"foo\")");
+ assertEquals(1, src.getEventQueue().remainingCapacity());
+ }
+
}