You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by bi...@apache.org on 2010/10/20 22:21:51 UTC
svn commit: r1025708 - in /incubator/chukwa/trunk: conf/
src/java/org/apache/hadoop/chukwa/extraction/
src/java/org/apache/hadoop/chukwa/extraction/archive/
src/java/org/apache/hadoop/chukwa/extraction/demux/
Author: billgraham
Date: Wed Oct 20 20:21:51 2010
New Revision: 1025708
URL: http://svn.apache.org/viewvc?rev=1025708&view=rev
Log:
CHUKWA-534. Better fault-tolerance for data processors
Modified:
incubator/chukwa/trunk/conf/chukwa-demux-conf.xml.template
incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
Modified: incubator/chukwa/trunk/conf/chukwa-demux-conf.xml.template
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/chukwa-demux-conf.xml.template?rev=1025708&r1=1025707&r2=1025708&view=diff
==============================================================================
--- incubator/chukwa/trunk/conf/chukwa-demux-conf.xml.template (original)
+++ incubator/chukwa/trunk/conf/chukwa-demux-conf.xml.template Wed Oct 20 20:21:51 2010
@@ -77,6 +77,14 @@
that will be triggered upon a successful Demux/PostProcessor job</description>
</property>
+ <property>
+ <name>post.process.max.error.count.before.shutdown</name>
+ <value>4</value>
+ <description>The number of consecutive errors to be encountered by the
+ PostProcessorManager before the process will be shut down. Set to -1 to continue
+ running despite error count.</description>
+ </property>
+
<!-- -->
<!-- ArchiveManager config -->
@@ -87,8 +95,27 @@
<description>Reduce count </description>
</property>
+ <property>
+ <name>archive.max.error.count.before.shutdown</name>
+ <value>4</value>
+ <description>The number of consecutive errors to be encountered by the
+ ArchiveManager before the process will be shut down. Set to -1 to continue
+ running despite error count.</description>
+ </property>
+
<!-- -->
+<!-- Demux configs -->
+
+ <property>
+ <name>demux.max.error.count.before.shutdown</name>
+ <value>5</value>
+ <description>The number of consecutive errors to be encountered by the
+ DemuxManager before the process will be shut down. Set to -1 to continue
+ running despite error count.</description>
+ </property>
+
+<!-- -->
<!-- Demux aliases -->
Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java?rev=1025708&r1=1025707&r2=1025708&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java Wed Oct 20 20:21:51 2010
@@ -30,14 +30,18 @@ public interface CHUKWA_CONSTANT {
public static final String CHUKWA_POST_PROCESS_DIR_FIELD = "chukwaPostProcessDir";
public static final String CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD = "chukwaPostProcessInErrorDir";
public static final String CHUKWA_DATA_SINK_DIR_FIELD = "chukwaDataSinkDir";
-
+
public static final String CHUKWA_NAGIOS_HOST_FIELD = "demux.nagiosHost";
public static final String CHUKWA_NAGIOS_PORT_FIELD = "demux.nagiosPort";
public static final String CHUKWA_REPORTING_HOST_FIELD = "demux.reportingHost4Nagios";
+ public static final String CHUKWA_POSTPROCESS_MAX_ERROR_COUNT_FIELD = "post.process.max.error.count.before.shutdown";
+ public static final String CHUKWA_ARCHIVE_MAX_ERROR_COUNT_FIELD = "archive.max.error.count.before.shutdown";
+ public static final String CHUKWA_DEMUX_MAX_ERROR_COUNT_FIELD = "demux.max.error.count.before.shutdown";
+
public static final String CHUKWA_DEMUX_REDUCER_COUNT_FIELD = "demux.reducerCount";
-
+
public static final String DEFAULT_CHUKWA_ROOT_DIR_NAME = "/chukwa/";
public static final String DEFAULT_REPOS_DIR_NAME = "repos/";
public static final String DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME = "postProcess/";
Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java?rev=1025708&r1=1025707&r2=1025708&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java Wed Oct 20 20:21:51 2010
@@ -39,7 +39,9 @@ public class ChukwaArchiveManager implem
static final int ONE_HOUR = 60 * 60 * 1000;
static final int ONE_DAY = 24*ONE_HOUR;
static final int MAX_FILES = 500;
-
+
+ private static final int DEFAULT_MAX_ERROR_COUNT = 4;
+
protected ChukwaConfiguration conf = null;
protected FileSystem fs = null;
protected boolean isRunning = true;
@@ -92,7 +94,8 @@ public class ChukwaArchiveManager implem
String archivesMROutputDir = archivesRootProcessingDir+ ARCHIVES_MR_OUTPUT_DIR_NAME;
String finalArchiveOutput = chukwaRootDir + DEFAULT_FINAL_ARCHIVES;
-
+ int maxPermittedErrorCount = conf.getInt(CHUKWA_ARCHIVE_MAX_ERROR_COUNT_FIELD,
+ DEFAULT_MAX_ERROR_COUNT);
Path pDailyRawArchivesInput = new Path(archiveRootDir);
Path pArchivesMRInputDir = new Path(archivesMRInputDir);
@@ -114,9 +117,9 @@ public class ChukwaArchiveManager implem
while (isRunning) {
try {
- if (errorCount >= 4 ) {
- // it's better to exit, Watchdog will re-start it
- log.warn("Too many error - bail out!");
+ if (maxPermittedErrorCount != -1 && errorCount >= maxPermittedErrorCount) {
+ log.warn("==================\nToo many errors (" + errorCount +
+ "), Bail out!\n==================");
DaemonWatcher.bailout(-1);
}
// /chukwa/archives/<YYYYMMDD>/dataSinkDirXXX
@@ -129,7 +132,8 @@ public class ChukwaArchiveManager implem
log.info("reprocessing current Archive input" + days[0].getPath());
runArchive(archivesMRInputDir + days[0].getPath().getName() + "/",archivesMROutputDir,finalArchiveOutput);
- continue;
+ errorCount = 0;
+ continue;
}
}
Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java?rev=1025708&r1=1025707&r2=1025708&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java Wed Oct 20 20:21:51 2010
@@ -23,14 +23,12 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
+import java.util.Date;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
-import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.chukwa.util.NagiosHelper;
import org.apache.hadoop.chukwa.util.DaemonWatcher;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -42,14 +40,17 @@ public class DemuxManager implements CHU
static Logger log = Logger.getLogger(DemuxManager.class);
static int globalErrorcounter = 0;
-
+ static Date firstErrorTime = null;
+
protected int ERROR_SLEEP_TIME = 60;
protected int NO_DATASINK_SLEEP_TIME = 20;
+ protected int DEFAULT_MAX_ERROR_COUNT = 6;
protected int DEFAULT_MAX_FILES_PER_DEMUX = 500;
protected int DEFAULT_REDUCER_COUNT = 8;
- protected int demuxReducerCount = 0;
+ protected int maxPermittedErrorCount = DEFAULT_MAX_ERROR_COUNT;
+ protected int demuxReducerCount = 0;
protected ChukwaConfiguration conf = null;
protected FileSystem fs = null;
protected int reprocess = 0;
@@ -133,6 +134,8 @@ public class DemuxManager implements CHU
}
log.info("archiveRootDir:" + archiveRootDir);
+ maxPermittedErrorCount = conf.getInt(CHUKWA_DEMUX_MAX_ERROR_COUNT_FIELD,
+ DEFAULT_MAX_ERROR_COUNT);
demuxReducerCount = conf.getInt(CHUKWA_DEMUX_REDUCER_COUNT_FIELD, DEFAULT_REDUCER_COUNT);
log.info("demuxReducerCount:" + demuxReducerCount);
@@ -156,8 +159,9 @@ public class DemuxManager implements CHU
try {
demuxReady = false;
- if (globalErrorcounter > 5) {
- log.warn("==================\nToo many errors, Bail out!\n==================");
+ if (maxPermittedErrorCount != -1 && globalErrorcounter >= maxPermittedErrorCount) {
+ log.warn("==================\nToo many errors (" + globalErrorcounter +
+ "), Bail out!\n==================");
DaemonWatcher.bailout(-1);
}
@@ -200,14 +204,22 @@ public class DemuxManager implements CHU
boolean demuxStatus = processData(dataSinkDir, demuxInputDir, demuxOutputDir,
postProcessDir, archiveRootDir);
sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,demuxStatus,null);
-
+
+ // if demux suceeds, then we reset these.
+ if (demuxStatus) {
+ globalErrorcounter = 0;
+ firstErrorTime = null;
+ }
} else {
log.info("Demux not ready so going to sleep ...");
Thread.sleep(NO_DATASINK_SLEEP_TIME * 1000);
}
}catch(Throwable e) {
- log.warn(e);
globalErrorcounter ++;
+ if (firstErrorTime == null) firstErrorTime = new Date();
+
+ log.warn("Consecutive error number " + globalErrorcounter +
+ " encountered since " + firstErrorTime, e);
sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,false, e.getMessage());
try { Thread.sleep(ERROR_SLEEP_TIME * 1000); }
catch (InterruptedException e1) {/*do nothing*/ }
@@ -331,8 +343,10 @@ public class DemuxManager implements CHU
return ( 0 == ToolRunner.run(this.conf,new Demux(), demuxParams) );
} catch (Throwable e) {
e.printStackTrace();
- log.error("failed to run demux", e);
globalErrorcounter ++;
+ if (firstErrorTime == null) firstErrorTime = new Date();
+ log.error("Failed to run demux. Consecutive error number " +
+ globalErrorcounter + " encountered since " + firstErrorTime, e);
}
return false;
}
Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java?rev=1025708&r1=1025707&r2=1025708&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java Wed Oct 20 20:21:51 2010
@@ -50,6 +50,8 @@ public class PostProcessorManager implem
protected ChukwaConfiguration conf = null;
protected FileSystem fs = null;
protected volatile boolean isRunning = true;
+
+ private static final int DEFAULT_MAX_ERROR_COUNT = 4;
final private static PathFilter POST_PROCESS_DEMUX_DIR_FILTER = new PathFilter() {
public boolean accept(Path file) {
@@ -109,18 +111,21 @@ public class PostProcessorManager implem
if ( ! chukwaPostProcessInErrorDir.endsWith("/") ) {
chukwaPostProcessInErrorDir += "/";
}
-
+
+ int maxPermittedErrorCount = conf.getInt(CHUKWA_POSTPROCESS_MAX_ERROR_COUNT_FIELD,
+ DEFAULT_MAX_ERROR_COUNT);
+
dataSources = new HashMap<String, String>();
Path postProcessDirectory = new Path(postProcessDir);
while (isRunning) {
- if (errorCount >= 4 ) {
- // it's better to exit, Watchdog will re-strat it
- log.warn("Too many error - bail out!");
+ if (maxPermittedErrorCount != -1 && errorCount >= maxPermittedErrorCount) {
+ log.warn("==================\nToo many errors (" + errorCount +
+ "), Bail out!\n==================");
DaemonWatcher.bailout(-1);
}
-
+
try {
FileStatus[] demuxOutputDirs = fs.listStatus(postProcessDirectory,POST_PROCESS_DEMUX_DIR_FILTER);
List<String> directories = new ArrayList<String>();