You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by bi...@apache.org on 2010/10/20 22:21:51 UTC

svn commit: r1025708 - in /incubator/chukwa/trunk: conf/ src/java/org/apache/hadoop/chukwa/extraction/ src/java/org/apache/hadoop/chukwa/extraction/archive/ src/java/org/apache/hadoop/chukwa/extraction/demux/

Author: billgraham
Date: Wed Oct 20 20:21:51 2010
New Revision: 1025708

URL: http://svn.apache.org/viewvc?rev=1025708&view=rev
Log:
CHUKWA-534. Better fault-tolerance for data processors

Modified:
    incubator/chukwa/trunk/conf/chukwa-demux-conf.xml.template
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java

Modified: incubator/chukwa/trunk/conf/chukwa-demux-conf.xml.template
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/chukwa-demux-conf.xml.template?rev=1025708&r1=1025707&r2=1025708&view=diff
==============================================================================
--- incubator/chukwa/trunk/conf/chukwa-demux-conf.xml.template (original)
+++ incubator/chukwa/trunk/conf/chukwa-demux-conf.xml.template Wed Oct 20 20:21:51 2010
@@ -77,6 +77,14 @@
     that will be triggered upon a successful Demux/PostProcessor job</description>
   </property>
 
+  <property>
+    <name>post.process.max.error.count.before.shutdown</name>
+    <value>4</value>
+    <description>The number of consecutive errors to be encountered by the
+    PostProcessorManager before the process will be shut down. Set to -1 to continue
+    running despite error count.</description>
+  </property>
+
 <!-- -->
 
 <!-- ArchiveManager config -->
@@ -87,8 +95,27 @@
     <description>Reduce count </description>
   </property>
 
+  <property>
+    <name>archive.max.error.count.before.shutdown</name>
+    <value>4</value>
+    <description>The number of consecutive errors to be encountered by the
+    ArchiveManager before the process will be shut down. Set to -1 to continue
+    running despite error count.</description>
+  </property>
+
 <!-- -->
 
+<!-- Demux configs -->
+
+  <property>
+    <name>demux.max.error.count.before.shutdown</name>
+    <value>5</value>
+    <description>The number of consecutive errors to be encountered by the
+    DemuxManager before the process will be shut down. Set to -1 to continue
+    running despite error count.</description>
+  </property>
+
+<!-- -->  
 
 <!-- Demux aliases -->
 

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java?rev=1025708&r1=1025707&r2=1025708&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java Wed Oct 20 20:21:51 2010
@@ -30,14 +30,18 @@ public interface CHUKWA_CONSTANT {
   public static final String CHUKWA_POST_PROCESS_DIR_FIELD = "chukwaPostProcessDir";
   public static final String CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD = "chukwaPostProcessInErrorDir";
   public static final String CHUKWA_DATA_SINK_DIR_FIELD = "chukwaDataSinkDir";
-  
+
   public static final String CHUKWA_NAGIOS_HOST_FIELD = "demux.nagiosHost";
   public static final String CHUKWA_NAGIOS_PORT_FIELD = "demux.nagiosPort";
   public static final String CHUKWA_REPORTING_HOST_FIELD = "demux.reportingHost4Nagios";
   
   
+  public static final String CHUKWA_POSTPROCESS_MAX_ERROR_COUNT_FIELD = "post.process.max.error.count.before.shutdown";
+  public static final String CHUKWA_ARCHIVE_MAX_ERROR_COUNT_FIELD     = "archive.max.error.count.before.shutdown";
+  public static final String CHUKWA_DEMUX_MAX_ERROR_COUNT_FIELD       = "demux.max.error.count.before.shutdown";
+
   public static final String CHUKWA_DEMUX_REDUCER_COUNT_FIELD     = "demux.reducerCount";
-  
+
   public static final String DEFAULT_CHUKWA_ROOT_DIR_NAME          = "/chukwa/";
   public static final String DEFAULT_REPOS_DIR_NAME               = "repos/";
   public static final String DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME  = "postProcess/";

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java?rev=1025708&r1=1025707&r2=1025708&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java Wed Oct 20 20:21:51 2010
@@ -39,7 +39,9 @@ public class ChukwaArchiveManager implem
   static final  int ONE_HOUR = 60 * 60 * 1000;
   static final int ONE_DAY = 24*ONE_HOUR;
   static final int MAX_FILES = 500;
-  
+
+  private static final int DEFAULT_MAX_ERROR_COUNT = 4;
+
   protected ChukwaConfiguration conf = null;
   protected FileSystem fs = null;
   protected boolean isRunning = true;
@@ -92,7 +94,8 @@ public class ChukwaArchiveManager implem
     String archivesMROutputDir = archivesRootProcessingDir+ ARCHIVES_MR_OUTPUT_DIR_NAME;
     String finalArchiveOutput = chukwaRootDir + DEFAULT_FINAL_ARCHIVES;
 
-
+    int maxPermittedErrorCount = conf.getInt(CHUKWA_ARCHIVE_MAX_ERROR_COUNT_FIELD,
+                                             DEFAULT_MAX_ERROR_COUNT);
     
     Path pDailyRawArchivesInput = new Path(archiveRootDir);
     Path pArchivesMRInputDir = new Path(archivesMRInputDir);
@@ -114,9 +117,9 @@ public class ChukwaArchiveManager implem
     while (isRunning) {
       try {
         
-        if (errorCount >= 4 ) {
-          // it's better to exit, Watchdog will re-start it
-          log.warn("Too many error - bail out!");
+        if (maxPermittedErrorCount != -1 && errorCount >= maxPermittedErrorCount) {
+          log.warn("==================\nToo many errors (" + errorCount +
+                   "), Bail out!\n==================");
           DaemonWatcher.bailout(-1);
         }
         // /chukwa/archives/<YYYYMMDD>/dataSinkDirXXX
@@ -129,7 +132,8 @@ public class ChukwaArchiveManager implem
             log.info("reprocessing current Archive input" +  days[0].getPath());
             
             runArchive(archivesMRInputDir + days[0].getPath().getName() + "/",archivesMROutputDir,finalArchiveOutput);  
-            continue; 
+            errorCount = 0;
+            continue;
           }
         }
         

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java?rev=1025708&r1=1025707&r2=1025708&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java Wed Oct 20 20:21:51 2010
@@ -23,14 +23,12 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.text.SimpleDateFormat;
+import java.util.Date;
 
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
-import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.chukwa.util.NagiosHelper;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -42,14 +40,17 @@ public class DemuxManager implements CHU
   static Logger log = Logger.getLogger(DemuxManager.class);
 
   static int globalErrorcounter = 0;
-  
+  static Date firstErrorTime = null;
+
   protected int ERROR_SLEEP_TIME = 60;
   protected int NO_DATASINK_SLEEP_TIME = 20;
 
+  protected int DEFAULT_MAX_ERROR_COUNT = 6;
   protected int DEFAULT_MAX_FILES_PER_DEMUX = 500;
   protected int DEFAULT_REDUCER_COUNT = 8;
   
-  protected int demuxReducerCount = 0; 
+  protected int maxPermittedErrorCount = DEFAULT_MAX_ERROR_COUNT;
+  protected int demuxReducerCount = 0;
   protected ChukwaConfiguration conf = null;
   protected FileSystem fs = null;
   protected int reprocess = 0;
@@ -133,6 +134,8 @@ public class DemuxManager implements CHU
      }
      log.info("archiveRootDir:" + archiveRootDir);
      
+     maxPermittedErrorCount = conf.getInt(CHUKWA_DEMUX_MAX_ERROR_COUNT_FIELD,
+                                          DEFAULT_MAX_ERROR_COUNT);
      demuxReducerCount = conf.getInt(CHUKWA_DEMUX_REDUCER_COUNT_FIELD, DEFAULT_REDUCER_COUNT);
      log.info("demuxReducerCount:" + demuxReducerCount);
      
@@ -156,8 +159,9 @@ public class DemuxManager implements CHU
        try {
          demuxReady = false;
 
-         if (globalErrorcounter > 5) {
-           log.warn("==================\nToo many errors, Bail out!\n==================");
+         if (maxPermittedErrorCount != -1 && globalErrorcounter >= maxPermittedErrorCount) {
+           log.warn("==================\nToo many errors (" + globalErrorcounter +
+                    "), Bail out!\n==================");
            DaemonWatcher.bailout(-1);
          }
          
@@ -200,14 +204,22 @@ public class DemuxManager implements CHU
           boolean demuxStatus = processData(dataSinkDir, demuxInputDir, demuxOutputDir,
                postProcessDir, archiveRootDir);
           sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,demuxStatus,null);
-          
+
+          // if demux suceeds, then we reset these.
+          if (demuxStatus) {
+           globalErrorcounter = 0;
+           firstErrorTime = null;
+          }
          } else {
            log.info("Demux not ready so going to sleep ...");
            Thread.sleep(NO_DATASINK_SLEEP_TIME * 1000);
          }
        }catch(Throwable e) {
-         log.warn(e);
          globalErrorcounter ++;
+         if (firstErrorTime == null) firstErrorTime = new Date();
+
+         log.warn("Consecutive error number " + globalErrorcounter +
+                  " encountered since " + firstErrorTime, e);
          sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,false, e.getMessage());
          try { Thread.sleep(ERROR_SLEEP_TIME * 1000); } 
          catch (InterruptedException e1) {/*do nothing*/ }
@@ -331,8 +343,10 @@ public class DemuxManager implements CHU
        return ( 0 == ToolRunner.run(this.conf,new Demux(), demuxParams) );
      } catch (Throwable e) {
        e.printStackTrace();
-       log.error("failed to run demux", e);
        globalErrorcounter ++;
+       if (firstErrorTime == null) firstErrorTime = new Date();
+       log.error("Failed to run demux. Consecutive error number " +
+               globalErrorcounter + " encountered since " + firstErrorTime, e);
      }
      return false;
    }

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java?rev=1025708&r1=1025707&r2=1025708&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java (original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java Wed Oct 20 20:21:51 2010
@@ -50,6 +50,8 @@ public class PostProcessorManager implem
   protected ChukwaConfiguration conf = null;
   protected FileSystem fs = null;
   protected volatile boolean isRunning = true;
+
+  private static final int DEFAULT_MAX_ERROR_COUNT = 4;
   
   final private static PathFilter POST_PROCESS_DEMUX_DIR_FILTER = new PathFilter() {
     public boolean accept(Path file) {
@@ -109,18 +111,21 @@ public class PostProcessorManager implem
     if ( ! chukwaPostProcessInErrorDir.endsWith("/") ) {
       chukwaPostProcessInErrorDir += "/";
     }
- 
+
+    int maxPermittedErrorCount = conf.getInt(CHUKWA_POSTPROCESS_MAX_ERROR_COUNT_FIELD,
+                                             DEFAULT_MAX_ERROR_COUNT);
+
     
     dataSources = new HashMap<String, String>();
     Path postProcessDirectory = new Path(postProcessDir);
     while (isRunning) {
       
-      if (errorCount >= 4 ) {
-        // it's better to exit, Watchdog will re-strat it
-        log.warn("Too many error - bail out!");
+      if (maxPermittedErrorCount != -1 && errorCount >= maxPermittedErrorCount) {
+        log.warn("==================\nToo many errors (" + errorCount +
+                 "), Bail out!\n==================");
         DaemonWatcher.bailout(-1);
       }
-      
+
       try {
         FileStatus[] demuxOutputDirs = fs.listStatus(postProcessDirectory,POST_PROCESS_DEMUX_DIR_FILTER);
         List<String> directories = new ArrayList<String>();