You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/22 06:38:03 UTC
svn commit: r1303659 - in /incubator/flume/trunk/flume-ng-core/src:
main/java/org/apache/flume/source/ExecSource.java
main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
test/java/org/apache/flume/source/TestExecSource.java
Author: arvind
Date: Thu Mar 22 05:38:03 2012
New Revision: 1303659
URL: http://svn.apache.org/viewvc?rev=1303659&view=rev
Log:
FLUME-979. ExecSource should optionally restart the command when it exits.
(Brock Noland via Arvind Prabhakar)
Added:
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java (with props)
Modified:
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java?rev=1303659&r1=1303658&r2=1303659&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java Thu Mar 22 05:38:03 2012
@@ -47,10 +47,10 @@ import com.google.common.base.Preconditi
* </p>
* <p>
* This source runs a given Unix command on start up and expects that process to
- * continuously produce data on standard out (stderr is simply discarded). If
- * the process exits for any reason, the source also exits and will produce no
- * further data. This means configurations such as <tt>cat [named pipe]</tt> or
- * <tt>tail -F [file]</tt> are going to produce the desired results where as
+ * continuously produce data on standard out (stderr ignored by default). Unless
+ * told to restart, if the process exits for any reason, the source also exits and
+ * will produce no further data. This means configurations such as <tt>cat [named pipe]</tt>
+ * or <tt>tail -F [file]</tt> are going to produce the desired results where as
* <tt>date</tt> will probably not - the former two commands produce streams of
* data where as the latter produces a single event and exits.
* </p>
@@ -98,6 +98,24 @@ import com.google.common.base.Preconditi
* <td>String</td>
* <td>none (required)</td>
* </tr>
+ * <tr>
+ * <td><tt>restart</tt></td>
+ * <td>Whether to restart the command when it exits</td>
+ * <td>Boolean</td>
+ * <td>false</td>
+ * </tr>
+ * <tr>
+ * <td><tt>restartThrottle</tt></td>
+ * <td>How long in milliseconds to wait before restarting the command</td>
+ * <td>Long</td>
+ * <td>10000</td>
+ * </tr>
+ * <tr>
+ * <td><tt>logStderr</tt></td>
+ * <td>Whether to log or discard the standard error stream of the command</td>
+ * <td>Boolean</td>
+ * <td>false</td>
+ * </tr>
* </table>
* <p>
* <b>Metrics</b>
@@ -107,16 +125,20 @@ import com.google.common.base.Preconditi
* </p>
*/
public class ExecSource extends AbstractSource implements EventDrivenSource,
- Configurable {
+Configurable {
private static final Logger logger = LoggerFactory
.getLogger(ExecSource.class);
- private String command;
+ private String command;
private CounterGroup counterGroup;
private ExecutorService executor;
private Future<?> runnerFuture;
+ private long restartThrottle;
+ private boolean restart;
+ private boolean logStderr;
+ private ExecRunnable runner;
@Override
public void start() {
@@ -125,11 +147,8 @@ public class ExecSource extends Abstract
executor = Executors.newSingleThreadExecutor();
counterGroup = new CounterGroup();
- ExecRunnable runner = new ExecRunnable();
-
- runner.command = command;
- runner.channelProcessor = getChannelProcessor();
- runner.counterGroup = counterGroup;
+ runner = new ExecRunnable(command, getChannelProcessor(), counterGroup,
+ restart, restartThrottle, logStderr);
// FIXME: Use a callback-like executor / future to signal us upon failure.
runnerFuture = executor.submit(runner);
@@ -148,6 +167,9 @@ public class ExecSource extends Abstract
public void stop() {
logger.info("Stopping exec source with command:{}", command);
+ if(runner != null) {
+ runner.setRestart(false);
+ }
if (runnerFuture != null) {
logger.debug("Stopping exec runner");
runnerFuture.cancel(true);
@@ -179,43 +201,127 @@ public class ExecSource extends Abstract
Preconditions.checkState(command != null,
"The parameter command must be specified");
+
+ restartThrottle = context.getLong(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE,
+ ExecSourceConfigurationConstants.DEFAULT_RESTART_THROTTLE);
+
+ restart = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_RESTART,
+ ExecSourceConfigurationConstants.DEFAULT_RESTART);
+
+ logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR,
+ ExecSourceConfigurationConstants.DEFAULT_LOG_STDERR);
}
private static class ExecRunnable implements Runnable {
+ public ExecRunnable(String command, ChannelProcessor channelProcessor,
+ CounterGroup counterGroup, boolean restart, long restartThrottle,
+ boolean logStderr) {
+ this.command = command;
+ this.channelProcessor = channelProcessor;
+ this.counterGroup = counterGroup;
+ this.restartThrottle = restartThrottle;
+ this.restart = restart;
+ this.logStderr = logStderr;
+ }
+
private String command;
private ChannelProcessor channelProcessor;
private CounterGroup counterGroup;
+ private volatile boolean restart;
+ private long restartThrottle;
+ private boolean logStderr;
@Override
public void run() {
- BufferedReader reader = null;
+ do {
+ String exitCode = "unknown";
+ BufferedReader reader = null;
+ Process process = null;
+ try {
+ String[] commandArgs = command.split("\\s+");
+ process = new ProcessBuilder(commandArgs).start();
+ reader = new BufferedReader(
+ new InputStreamReader(process.getInputStream()));
+
+ // StderrLogger dies as soon as the input stream is invalid
+ StderrReader stderrReader = new StderrReader(new BufferedReader(
+ new InputStreamReader(process.getErrorStream())), logStderr);
+ stderrReader.setName("StderrReader-[" + command + "]");
+ stderrReader.setDaemon(true);
+ stderrReader.start();
+
+ String line = null;
+
+ while ((line = reader.readLine()) != null) {
+ counterGroup.incrementAndGet("exec.lines.read");
+ channelProcessor.processEvent(EventBuilder.withBody(line.getBytes()));
+ }
+ } catch (Exception e) {
+ logger.error("Failed while running command: " + command, e);
+ if(e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException ex) {
+ logger.error("Failed to close reader for exec source", ex);
+ }
+ }
+ if(process != null) {
+ process.destroy();
+ try {
+ exitCode = String.valueOf(process.waitFor());
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ if(restart) {
+ logger.info("Restarting in {}ms, exit code {}", restartThrottle, exitCode);
+ try {
+ Thread.sleep(restartThrottle);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } while(restart);
+ }
+ public void setRestart(boolean restart) {
+ this.restart = restart;
+ }
+ }
+ private static class StderrReader extends Thread {
+ private BufferedReader input;
+ private boolean logStderr;
+ protected StderrReader(BufferedReader input, boolean logStderr) {
+ this.input = input;
+ this.logStderr = logStderr;
+ }
+ @Override
+ public void run() {
try {
- String[] commandArgs = command.split("\\s+");
- Process process = new ProcessBuilder(commandArgs).start();
- reader = new BufferedReader(
- new InputStreamReader(process.getInputStream()));
-
+ int i = 0;
String line = null;
-
- while ((line = reader.readLine()) != null) {
- counterGroup.incrementAndGet("exec.lines.read");
- channelProcessor.processEvent(EventBuilder.withBody(line.getBytes()));
+ while((line = input.readLine()) != null) {
+ if(logStderr) {
+ logger.info("StderrLogger[{}] = '{}'", ++i, line);
+ }
}
-
- } catch (Exception e) {
- logger.error("Failed while running command:" + command
- + " - Exception follows.", e);
+ } catch (IOException e) {
+ logger.info("StderrLogger exiting", e);
} finally {
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException ex) {
- logger.error("Failed to close reader for exec source", ex);
+ try {
+ if(input != null) {
+ input.close();
}
+ } catch (IOException ex) {
+ logger.error("Failed to close stderr reader for exec source", ex);
}
}
}
- }
+ }
}
Added: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java?rev=1303659&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java Thu Mar 22 05:38:03 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+public class ExecSourceConfigurationConstants {
+
+ /**
+ * Should the exec'ed command restarted if it dies: : default false
+ */
+ public static final String CONFIG_RESTART = "restart";
+ public static final boolean DEFAULT_RESTART = false;
+
+ /**
+ * Amount of time to wait before attempting a restart: : default 10000 ms
+ */
+ public static final String CONFIG_RESTART_THROTTLE = "restartThrottle";
+ public static final long DEFAULT_RESTART_THROTTLE = 10000L;
+
+ /**
+ * Should stderr from the command be logged: default false
+ */
+ public static final String CONFIG_LOG_STDERR = "logStdErr";
+ public static final boolean DEFAULT_LOG_STDERR = false;
+
+}
Propchange: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java?rev=1303659&r1=1303658&r2=1303659&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java Thu Mar 22 05:38:03 2012
@@ -19,11 +19,12 @@
package org.apache.flume.source;
+
+import static org.junit.Assert.*;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.flume.Channel;
@@ -41,6 +42,9 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
public class TestExecSource {
private AbstractSource source;
@@ -52,7 +56,7 @@ public class TestExecSource {
@Test
public void testProcess() throws InterruptedException, LifecycleException,
- EventDeliveryException, IOException {
+ EventDeliveryException, IOException {
Channel channel = new MemoryChannel();
Context context = new Context();
@@ -64,11 +68,8 @@ public class TestExecSource {
Configurables.configure(source, context);
Configurables.configure(channel, context);
- List<Channel> channels = new ArrayList<Channel>();
- channels.add(channel);
-
ChannelSelector rcs = new ReplicatingChannelSelector();
- rcs.setChannels(channels);
+ rcs.setChannels(Lists.newArrayList(channel));
source.setChannelProcessor(new ChannelProcessor(rcs));
@@ -100,4 +101,46 @@ public class TestExecSource {
FileUtils.forceDelete(file1);
}
+
+ @Test
+ public void testRestart() throws InterruptedException, LifecycleException,
+ EventDeliveryException, IOException {
+
+ Channel channel = new MemoryChannel();
+ Context context = new Context();
+
+ context.put(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE, "10");
+ context.put(ExecSourceConfigurationConstants.CONFIG_RESTART, "true");
+
+ context.put("command", "echo flume");
+ Configurables.configure(source, context);
+ Configurables.configure(channel, context);
+
+ ChannelSelector rcs = new ReplicatingChannelSelector();
+ rcs.setChannels(Lists.newArrayList(channel));
+
+ source.setChannelProcessor(new ChannelProcessor(rcs));
+
+ source.start();
+ Transaction transaction = channel.getTransaction();
+
+ transaction.begin();
+
+ long start = System.currentTimeMillis();
+
+ for(int i = 0; i < 5; i++) {
+ Event event = channel.take();
+ assertNotNull(event);
+ assertNotNull(event.getBody());
+ assertEquals("flume", new String(event.getBody(), Charsets.UTF_8));
+ }
+
+ // ensure restartThrottle was turned down as expected
+ assertTrue(System.currentTimeMillis() - start < 10000L);
+
+ transaction.commit();
+ transaction.close();
+
+ source.stop();
+ }
}