You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:53:25 UTC

svn commit: r1077769 - in /hadoop/common/branches/branch-0.20-security-patches: ./ src/core/org/apache/hadoop/util/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apache/hadoop/util/

Author: omalley
Date: Fri Mar  4 04:53:25 2011
New Revision: 1077769

URL: http://svn.apache.org/viewvc?rev=1077769&view=rev
Log:
commit e0e776951c6e639727eefea408d873b0d87d2961
Author: Matt Foley <ma...@yahoo-inc.com>
Date:   Thu Dec 16 16:55:49 2010 -0800

    HDFS: BZ-4182948. Add statistics logging to Fred for better visibility into
    startup time costs. (Matt Foley)

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/QueueProcessingStatistics.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestQueueProcessingStatistics.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Modified: hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt?rev=1077769&r1=1077768&r2=1077769&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt Fri Mar  4 04:53:25 2011
@@ -4,6 +4,9 @@ Release 0.20.3 - Unreleased
 
   IMPROVEMENTS
 
+    BZ-4182948. Add statistics logging to Fred for better visibility into
+    startup time costs. (Matt Foley)
+
   BUG FIXES
 
     HDFS-955. New implementation of saveNamespace() to avoid loss of edits 

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/QueueProcessingStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/QueueProcessingStatistics.java?rev=1077769&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/QueueProcessingStatistics.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/QueueProcessingStatistics.java Fri Mar  4 04:53:25 2011
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.commons.logging.Log;
+
+/**
+ * Hadoop has several work queues, such as 
+ * {@link org.apache.hadoop.hdfs.server.namenode.FSNamesystem#neededReplications}
+ * With a properly throttled queue, a worker thread cycles repeatedly,
+ * doing a chunk of work each cycle then resting a bit, until the queue is
+ * empty.  This class is intended to collect statistics about the behavior of
+ * such queues and consumers.  It reports the amount of work done and 
+ * how long it took, for the first cycle after collection starts, and for 
+ * the total number of cycles needed to flush the queue.  We use a state 
+ * machine to detect when the queue has been flushed and then we log the 
+ * stats; see {@link State} for enumeration of the states and their meanings.
+ */
+public abstract class QueueProcessingStatistics {
+  //All member variables and methods that would normally be access "private"
+  //are instead package-private so we can subclass for unit testing.
+  State state = State.BEGIN_COLLECTING;
+  long startTimeCurrentCycle;
+  long startTime;
+  long processDuration;
+  long clockDuration;
+  long workItemCount;
+  int  cycleCount;
+  
+  String queueName;
+  String workItemsName;
+  Log LOG;
+
+  /**
+   * This enum provides the "states" of a state machine for 
+   * {@link QueueProcessingStatistics}.
+   * The meanings of the states are: <ul>
+   * <li> BEGIN_COLLECTING - Ready to begin.
+   * <li> IN_FIRST_CYCLE - Started the first cycle.
+   * <li> IN_SOLE_CYCLE - Still in first cycle, but already know there will be 
+   *                      no further cycles because this one will complete all
+   *                      needed work. When done, will go straight to
+   *                      "DONE_COLLECTING".
+   * <li> DONE_FIRST_CYCLE - Done first cycle, doing subsequent cycles.
+   * <li> IN_LAST_CYCLE - Started the last cycle.
+   * <li> DONE_COLLECTING - Done with last cycle, finishing up.
+   * </ul>
+   */
+  public enum State {
+    BEGIN_COLLECTING,
+    IN_FIRST_CYCLE,
+    IN_SOLE_CYCLE,
+    DONE_FIRST_CYCLE,
+    IN_LAST_CYCLE,
+    DONE_COLLECTING,
+  }
+
+  /**
+   * @param queueName - Human-readable name of the queue being monitored, 
+   *                    used as first word in the log messages.
+   * @param workItemsName - what kind of work items are being managed
+   *                    on the queue?  A plural word is best here, for logging.
+   * @param logObject - What log do you want the log messages to be sent to?
+   */
+  public QueueProcessingStatistics(String queueName, String workItemsName,
+      Log logObject) {
+    this.queueName = queueName;
+    this.workItemsName = workItemsName;
+    this.LOG = logObject;
+  }
+  
+  public void startCycle(int maxWorkToProcess) {
+    //only collect stats for one complete flush of the queue
+    if (state == State.DONE_COLLECTING) return;
+    
+    //regardless of state, record the start of this cycle
+    startTimeCurrentCycle = now();
+    boolean preDetectLastCycle = preCheckIsLastCycle(maxWorkToProcess);
+    
+    switch (state) {
+    case BEGIN_COLLECTING:
+      startTime = startTimeCurrentCycle;
+      state = preDetectLastCycle ? State.IN_SOLE_CYCLE : State.IN_FIRST_CYCLE;
+      break;
+    default:
+      if (preDetectLastCycle)
+        state = State.IN_LAST_CYCLE;
+      break;
+    }
+  }
+  
+  public void endCycle(int workFound) {
+    //only collect stats for first pass through the queue
+    if (state == State.DONE_COLLECTING) return;
+    
+    //regardless of state, record the end of this cycle
+    //and accumulate the cycle's stats
+    long endTimeCurrentCycle = now();
+    processDuration += endTimeCurrentCycle - startTimeCurrentCycle;
+    clockDuration = endTimeCurrentCycle - startTime;
+    workItemCount += workFound;
+    cycleCount++;
+    boolean postDetectLastCycle = postCheckIsLastCycle(workFound);
+    
+    switch (state) {
+    case BEGIN_COLLECTING:
+      logError("endCycle() called before startCycle(), "
+          + "exiting stats collection");
+      state = State.DONE_COLLECTING;
+      break;
+    case IN_FIRST_CYCLE:
+      if (postDetectLastCycle) {
+        state = State.IN_SOLE_CYCLE;
+        //and fall through
+      } else {
+        logEndFirstCycle();
+        state = State.DONE_FIRST_CYCLE;
+        break;
+      }
+    case IN_SOLE_CYCLE:
+      logEndFirstCycle();
+      logEndLastCycle();
+      state = State.DONE_COLLECTING;
+      break;
+    case DONE_FIRST_CYCLE:
+      if (postDetectLastCycle) {
+        state = State.IN_LAST_CYCLE;
+        //and fall through
+      } else {
+        break;
+      }
+    case IN_LAST_CYCLE:
+      logEndLastCycle();
+      state = State.DONE_COLLECTING;
+      break;
+    default:
+      logError("unallowed state reached, exiting stats collection");
+      state = State.DONE_COLLECTING;
+      break;
+    }
+  }
+  
+  public void checkRestart() {
+    switch (state) {
+    case BEGIN_COLLECTING:
+      //situation normal
+      return;
+    case DONE_COLLECTING:
+      logError("Restarted stats collection after completion of first "
+          + "queue flush.");
+      initialize();
+      break;
+    default:
+      //for all other cases, we are in the middle of stats collection,
+      //so output the stats collected so far before re-initializing
+      logErrorWithStats("Restarted stats collection before completion of "
+          + "first queue flush.");
+      initialize();
+      break;        
+    }
+  }
+
+  void initialize() {
+    state = State.BEGIN_COLLECTING;
+    startTimeCurrentCycle = 0;
+    startTime = 0;
+    processDuration = 0;
+    clockDuration = 0;
+    workItemCount = 0;
+    cycleCount = 0;
+  }
+
+  /**
+   * The termination condition is to identify the last cycle that will 
+   * empty the queue.  Two abstract APIs are called: {@code preCheckIsLastCycle}
+   * is called at the beginning of each cycle, and 
+   * {@link #postCheckIsLastCycle} is called at the end of each cycle. 
+   * At least one of them must correctly provide the termination 
+   * condition. The other may always return 'false'.  If either of them 
+   * returns 'true' in a given cycle, then at the end of that cycle the 
+   * stats will be output to log, and stats collection will end.
+   * 
+   * @param maxWorkToProcess - if this number is greater than the amount
+   *            of work remaining at the start of a cycle, then it will
+   *            be the last cycle.
+   * @return - true if last cycle detected, else false
+   */
+  public abstract boolean preCheckIsLastCycle(int maxWorkToProcess);
+
+  /**
+   * See {@link #preCheckIsLastCycle}.
+   * @param workFound - may not be useful
+   * @return - true if remaining work is zero at end of cycle,
+   *           else false
+   */
+  public abstract boolean postCheckIsLastCycle(int workFound);
+  
+  String msgEndFirstCycle() {
+    return queueName + " QueueProcessingStatistics: First cycle completed " 
+    + workItemCount + " " + workItemsName + " in " + processDuration 
+    + " msec";
+  }
+  
+  void logEndFirstCycle() {
+    LOG.info(msgEndFirstCycle());
+  }
+  
+  String msgEndLastCycle() {
+    return queueName
+    + " QueueProcessingStatistics: Queue flush completed " 
+    + workItemCount + " " + workItemsName + " in "
+    + processDuration + " msec processing time, "
+    + clockDuration + " msec clock time, " 
+    + cycleCount + " cycles";
+  }
+  
+  void logEndLastCycle() {
+    LOG.info(msgEndLastCycle());
+  }
+  
+  String msgError(String msg) {
+    return queueName
+    + " QueueProcessingStatistics - Error: " + msg;
+  }
+  
+  void logError(String msg) {
+    LOG.error(msgError(msg));
+  }
+  
+  String msgErrorWithStats(String msg) {
+    return queueName
+    + " QueueProcessingStatistics - Error: " + msg 
+    + " Completed " + workItemCount + " " + workItemsName + " in "
+    + processDuration + " msec processing time, "
+    + clockDuration + " msec clock time, " 
+    + cycleCount + " cycles";
+  }
+
+  void logErrorWithStats(String msg) {
+    LOG.error(msgErrorWithStats(msg));
+  }
+  
+  /**
+   * Current system time.
+   * @return current time in msec.
+   */
+  static long now() {
+    return System.currentTimeMillis();
+  }
+    
+}
+

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1077769&r1=1077768&r2=1077769&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Mar  4 04:53:25 2011
@@ -893,21 +893,26 @@ public class DataNode extends Configured
           }
         }
 
-        // send block report
+        // Send latest blockinfo report if timer has expired.
         if (startTime - lastBlockReport > blockReportInterval) {
-          //
-          // Send latest blockinfo report if timer has expired.
-          // Get back a list of local block(s) that are obsolete
-          // and can be safely GC'ed.
-          //
-          long brStartTime = now();
+          
+          // Create block report
+          long brCreateStartTime = now();
           Block[] bReport = data.getBlockReport();
+          
+          // Send block report
+          long brSendStartTime = now();
           DatanodeCommand cmd = namenode.blockReport(dnRegistration,
                   BlockListAsLongs.convertToArrayLongs(bReport));
-          long brTime = now() - brStartTime;
-          myMetrics.addBlockReport(brTime);
-          LOG.info("BlockReport of " + bReport.length +
-              " blocks got processed in " + brTime + " msecs");
+          
+          // Log the block report processing stats from Datanode perspective
+          long brSendCost = now() - brSendStartTime;
+          long brCreateCost = brSendStartTime - brCreateStartTime;
+          myMetrics.addBlockReport(brSendCost);
+          LOG.info("BlockReport of " + bReport.length
+              + " blocks took " + brCreateCost + " msec to generate and "
+              + brSendCost + " msecs for RPC and NN processing");
+
           //
           // If we have sent the first block report, then wait a random
           // time before we start the periodic block reports.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1077769&r1=1077768&r2=1077769&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Mar  4 04:53:25 2011
@@ -115,6 +115,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.QueueProcessingStatistics;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
@@ -264,12 +265,12 @@ public class FSNamesystem implements FSC
    */
   ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
 
-  //
-  // Store set of Blocks that need to be replicated 1 or more times.
-  // We also store pending replication-orders.
-  // Set of: Block
-  //
+/**
+ * Store set of Blocks that need to be replicated 1 or more times.
+ * Set of: Block
+ */
   private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
+  // We also store pending replication-orders.
   private PendingReplicationBlocks pendingReplications;
 
   public LeaseManager leaseManager = new LeaseManager(this); 
@@ -282,6 +283,7 @@ public class FSNamesystem implements FSC
   public Daemon lmthread = null;   // LeaseMonitor thread
   Daemon smmthread = null;  // SafeModeMonitor thread
   public Daemon replthread = null;  // Replication thread
+  private ReplicationMonitor replmon = null; // Replication metrics
   
   private volatile boolean fsRunning = true;
   long systemStart = 0;
@@ -390,7 +392,8 @@ public class FSNamesystem implements FSC
     }
     this.hbthread = new Daemon(new HeartbeatMonitor());
     this.lmthread = new Daemon(leaseManager.new Monitor());
-    this.replthread = new Daemon(new ReplicationMonitor());
+    this.replmon = new ReplicationMonitor();
+    this.replthread = new Daemon(replmon);
     hbthread.start();
     lmthread.start();
     replthread.start();
@@ -561,6 +564,7 @@ public class FSNamesystem implements FSC
       if (pendingReplications != null) pendingReplications.stop();
       if (hbthread != null) hbthread.interrupt();
       if (replthread != null) replthread.interrupt();
+      if (replmon != null) replmon = null;
       if (dnthread != null) dnthread.interrupt();
       if (smmthread != null) smmthread.interrupt();
       if (dtSecretManager != null) dtSecretManager.stopThreads();
@@ -2499,6 +2503,11 @@ public class FSNamesystem implements FSC
   class ReplicationMonitor implements Runnable {
     static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
     static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
+    ReplicateQueueProcessingStats replicateQueueStats = 
+        new ReplicateQueueProcessingStats();
+    InvalidateQueueProcessingStats invalidateQueueStats = 
+        new InvalidateQueueProcessingStats();
+    
     public void run() {
       while (fsRunning) {
         try {
@@ -2516,8 +2525,61 @@ public class FSNamesystem implements FSC
         }
       }
     }
-  }
 
+    /**
+     * Just before exiting safe mode, {@link SafeModeInfo.leave()} scans all 
+     * blocks and identifies under-replicated and invalid blocks.  These are 
+     * placed in work queues.  Immediately after leaving safe mode,
+     * {@link ReplicationMonitor} starts processing these queues via calls to
+     * {@link #computeDatanodeWork()}.  Each call does a chunk of work, then
+     * waits 3 seconds before doing the next chunk of work, to avoid monopolizing
+     * the CPUs and the global lock.  It may take several cycles before the
+     * queue is completely flushed.
+     * 
+     * Here we use two concrete subclasses of {@link QueueProcessingStatistics}
+     * to collect stats about these processes.
+     */
+    private class ReplicateQueueProcessingStats 
+    extends QueueProcessingStatistics {
+      
+      ReplicateQueueProcessingStats() {
+        super("ReplicateQueue", "blocks", FSNamesystem.LOG);
+      }
+      
+      // @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem.QueueProcessingStatistics#preCheckIsLastCycle(int)
+      @Override
+      public boolean preCheckIsLastCycle(int maxWorkToProcess) {
+        return (maxWorkToProcess >= neededReplications.size());
+      }
+  
+      // @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem.QueueProcessingStatistics#postCheckIsLastCycle(int)
+      @Override
+      public boolean postCheckIsLastCycle(int workFound) {
+        return false;
+      }
+    }
+  
+    private class InvalidateQueueProcessingStats 
+    extends QueueProcessingStatistics {
+      
+      InvalidateQueueProcessingStats() {
+        super("InvalidateQueue", "blocks", FSNamesystem.LOG);
+      }
+  
+      // @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem.QueueProcessingStatistics#preCheckIsLastCycle(int)
+      @Override
+      public boolean preCheckIsLastCycle(int maxWorkToProcess) {
+        return false;
+      }
+  
+      // @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem.QueueProcessingStatistics#postCheckIsLastCycle(int)
+      @Override
+      public boolean postCheckIsLastCycle(int workFound) {
+        return recentInvalidateSets.isEmpty();
+      }
+    }
+  }
+  
   /////////////////////////////////////////////////////////
   //
   // These methods are called by the Namenode system, to see
@@ -2532,12 +2594,17 @@ public class FSNamesystem implements FSC
    * @return number of blocks scheduled for replication or removal.
    */
   public int computeDatanodeWork() throws IOException {
-    int workFound = 0;
+    int replicationWorkFound = 0;
+    int invalidationWorkFound = 0;
     int blocksToProcess = 0;
     int nodesToProcess = 0;
+
     // blocks should not be replicated or removed if safe mode is on
-    if (isInSafeMode())
-      return workFound;
+    if (isInSafeMode()) {
+      replmon.replicateQueueStats.checkRestart();
+      replmon.invalidateQueueStats.checkRestart();
+      return 0;
+    }
     synchronized(heartbeats) {
       blocksToProcess = (int)(heartbeats.size() 
           * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
@@ -2545,18 +2612,23 @@ public class FSNamesystem implements FSC
           * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
     }
 
-    workFound = computeReplicationWork(blocksToProcess); 
+    replmon.replicateQueueStats.startCycle(blocksToProcess);
+    replicationWorkFound = computeReplicationWork(blocksToProcess);
+    replmon.replicateQueueStats.endCycle(replicationWorkFound);
     
     // Update FSNamesystemMetrics counters
     synchronized (this) {
       pendingReplicationBlocksCount = pendingReplications.size();
       underReplicatedBlocksCount = neededReplications.size();
-      scheduledReplicationBlocksCount = workFound;
+      scheduledReplicationBlocksCount = replicationWorkFound;
       corruptReplicaBlocksCount = corruptReplicas.size();
     }
     
-    workFound += computeInvalidateWork(nodesToProcess);
-    return workFound;
+    replmon.invalidateQueueStats.startCycle(nodesToProcess);
+    invalidationWorkFound = computeInvalidateWork(nodesToProcess);
+    replmon.invalidateQueueStats.endCycle(invalidationWorkFound);
+
+    return replicationWorkFound + invalidationWorkFound;
   }
 
   private int computeInvalidateWork(int nodesToProcess) {
@@ -3102,7 +3174,11 @@ public class FSNamesystem implements FSC
           + " does not belong to any file.");
       addToInvalidates(b, node);
     }
-    NameNode.getNameNodeMetrics().addBlockReport(now() - startTime);
+    long endTime = now();
+    NameNode.getNameNodeMetrics().addBlockReport(endTime - startTime);
+    NameNode.stateChangeLog.info("*BLOCK* NameSystem.processReport: from "
+        + nodeID.getName() + ", blocks: " + newReport.getNumberOfBlocks()
+        + ", processing time: " + (endTime - startTime) + " msecs");
   }
 
   /**
@@ -4301,7 +4377,13 @@ public class FSNamesystem implements FSC
         }
       }
       // verify blocks replications
+      long startTimeMisReplicatedScan = now();
       processMisReplicatedBlocks();
+      NameNode.stateChangeLog.info("STATE* Safe mode termination "
+          + "scan for invalid, over- and under-replicated blocks "
+          + "completed in " + (now() - startTimeMisReplicatedScan)
+          + " msec");
+
       long timeInSafemode = now() - systemStart;
       NameNode.stateChangeLog.info("STATE* Leaving safe mode after " 
                                     + timeInSafemode/1000 + " secs.");

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestQueueProcessingStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestQueueProcessingStatistics.java?rev=1077769&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestQueueProcessingStatistics.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestQueueProcessingStatistics.java Fri Mar  4 04:53:25 2011
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.QueueProcessingStatistics.State;
+
+import org.junit.Before;
+import org.junit.Test;
+
+
+/**
+ * Standalone unit tests for QueueProcessingStatistics
+ */
+public class TestQueueProcessingStatistics extends junit.framework.TestCase {
+  
+  public static final Log testLog = LogFactory.getLog(
+      TestQueueProcessingStatistics.class.getName() + ".testLog");
+  
+  private UnitQueueProcessingStats qStats = new UnitQueueProcessingStats();
+
+  public enum ExpectedLogResult {NONE, EXPECT_END_FIRST_CYCLE, 
+    EXPECT_END_SOLE_CYCLE, EXPECT_END_LAST_CYCLE, 
+    EXPECT_ERROR, EXPECT_ERROR_WITH_STATUS,}
+  
+  private static final long DEFAULT_CYCLE_DURATION = 25;
+  private static final long DEFAULT_CYCLE_DELAY = 10;
+  
+  //minimum expected duration of each cycle must be >= 0 msec, 
+  //ignored if zero.
+  private long cycleDuration = DEFAULT_CYCLE_DURATION; 
+  //minimum expected rest time after each cycle must be >= 0 msec, 
+  //ignored if zero.
+  private long cycleDelay = DEFAULT_CYCLE_DELAY;
+
+  @Before
+  public void initialize() {
+    qStats.initialize();
+    cycleDuration = DEFAULT_CYCLE_DURATION;
+    cycleDelay = DEFAULT_CYCLE_DELAY;
+    assertExpectedValues(false, State.BEGIN_COLLECTING, 0, 0);
+  }
+
+  /**
+   * Does a set of asserts, appropriate to the current state of qStats.
+   *                        
+   * Note that any or all of cycleDuration, cycleDelay, and workItemCount 
+   * may be zero, which restricts the assertions we can make about the growth
+   * of the qStats values as cycleCount increases.
+   * 
+   * @param inCycle - true if believed to be currently in the middle of a cycle,
+   *                  false if believed to have ended the most recent cycle.
+   * @param state - expected state (ignored if null)
+   * @param workItemCount - expected count (ignored if null)
+   * @param cycleCount - expected count (ignored if null)
+   */
+  public void assertExpectedValues(
+      boolean inCycle, QueueProcessingStatistics.State state, 
+      Integer workItemCount, Integer cycleCount) {
+    //Check implicit arguments
+    assertTrue(cycleDuration >= 0L);
+    assertTrue(cycleDelay >= 0L);
+    
+    //Asserts based on expected values
+    if (state != null) 
+      assertEquals(failMsg(), state, qStats.state);
+    if (workItemCount != null) 
+      assertEquals(failMsg(), (long) workItemCount, qStats.workItemCount);
+    if (cycleCount != null)
+      assertEquals(failMsg(), (int) cycleCount, qStats.cycleCount);
+    
+    //Asserts based on general principles
+    assertTrue(failMsg(), qStats.startTime >= 0);
+    if (qStats.state != State.BEGIN_COLLECTING) {
+      assertAlmostEquals(failMsg() + " inCycle=" + inCycle, 
+          qStats.startTimeCurrentCycle, 
+          qStats.startTime 
+          + (cycleDuration + cycleDelay) 
+            * (qStats.cycleCount - (inCycle ? 0 : 1)));
+      assertAlmostEquals(failMsg(), 
+          qStats.processDuration, cycleDuration * qStats.cycleCount);
+      assertAlmostEquals(failMsg(), 
+          qStats.clockDuration, 
+          qStats.processDuration 
+          + cycleDelay * (qStats.cycleCount - (qStats.cycleCount > 0 ? 1 : 0)));
+    }
+    assertTrue(failMsg(), qStats.workItemCount >= 0);
+    assertTrue(failMsg(), qStats.cycleCount >= 0);
+   
+    //Asserts based on state machine State.
+    switch (qStats.state) {
+    case BEGIN_COLLECTING:
+      assertFalse(failMsg(), inCycle);
+      assertEquals(failMsg(), 0, qStats.startTime);
+      assertEquals(failMsg(), 0, qStats.startTimeCurrentCycle);
+      assertEquals(failMsg(), 0, qStats.processDuration);
+      assertEquals(failMsg(), 0, qStats.clockDuration);
+      assertEquals(failMsg(), 0, qStats.workItemCount);
+      assertEquals(failMsg(), 0, qStats.cycleCount);
+      break;
+    case IN_FIRST_CYCLE:
+    case IN_SOLE_CYCLE:
+      assertTrue(failMsg(), inCycle);
+      assertTrue(failMsg(), qStats.startTime > 0);
+      assertEquals(failMsg(), qStats.startTime, qStats.startTimeCurrentCycle);
+      assertEquals(failMsg(), 0, qStats.processDuration);
+      assertEquals(failMsg(), 0, qStats.clockDuration);
+      assertEquals(failMsg(), 0, qStats.workItemCount);
+      assertEquals(failMsg(), 0, qStats.cycleCount);
+      break;
+    case DONE_FIRST_CYCLE:
+      //Can't make any assertions about "inCycle".
+      //For other qStats values, the general principles are the strongest
+      //assertions that can be made.
+      assertTrue(failMsg(), qStats.startTime > 0);
+      assertTrue(failMsg(), qStats.cycleCount > 0);
+      break;
+    case IN_LAST_CYCLE:
+      assertTrue(failMsg(), inCycle);
+      assertTrue(failMsg(), qStats.startTime > 0);
+      assertTrue(failMsg(), qStats.cycleCount > 0);
+     break;
+    case DONE_COLLECTING:
+      assertFalse(failMsg(), inCycle);
+      assertTrue(failMsg(), qStats.startTime > 0);
+      assertTrue(failMsg(), qStats.cycleCount > 0);
+      break;
+    default:
+      fail(failMsg() + " Reached unallowed state");
+      break;
+    }
+  }
+  
+  private String failMsg() {
+    return "State=" + qStats.state + " cycleCount=" + qStats.cycleCount;
+  }
+  
+  /**
+   * The cycleDuration and cycleDelay are simulated by sleeping for those
+   * numbers of milliseconds.  Since sleep() is inexact, we'll assume it may
+   * vary by +/- 1 msec per call.  Since there are two calls per
+   * virtual cycle, the potential error is twice that.  And we add 1 to
+   * account for sheer paranoia and the possibility of two consecutive 
+   * calls to "now()" occurring across a tick boundary.
+   * We'll use values > 10 for cycleDuration and cycleDelay,
+   * so the error isn't huge, percentage-wise.
+   * 
+   * @return - whether the difference between inputs a and b is within
+   * that expected error
+   */
+  private boolean almostEquals(long a, long b) {
+    long diff = a - b;
+    if (diff < 0) diff = -diff;
+    return diff < 2 * (qStats.cycleCount + 1);
+  }
+  
+  private void assertAlmostEquals(long a, long b) {
+    if (!almostEquals(a, b)) 
+      fail("Failed almostEquals test: " + a + ", " + b);
+  }
+
+  private void assertAlmostEquals(String msg, long a, long b) {
+    if (!almostEquals(a, b)) 
+      fail(msg + "; Failed almostEquals test: " + a + ", " + b);
+  }
+
+  /*
+   * Concrete subclasses of {@link QueueProcessingStatistics} for unit test
+   */
+ private class UnitQueueProcessingStats 
+ extends QueueProcessingStatistics {
+   public boolean triggerPreDetectLastCycle = false;
+   public boolean triggerPostDetectLastCycle = false;
+   public ExpectedLogResult expectedLogResult = ExpectedLogResult.NONE;
+   
+   UnitQueueProcessingStats() {
+     super("UnitTestQueue", "blocks", testLog);
+   }
+   
+   @Override
+   void initialize() {
+     super.initialize();
+     triggerPreDetectLastCycle = false;
+     triggerPostDetectLastCycle = false;
+     expectedLogResult = ExpectedLogResult.NONE;
+   }
+   
+   // @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem.QueueProcessingStatistics#preCheckIsLastCycle(int)
+   @Override
+   public boolean preCheckIsLastCycle(int maxWorkToProcess) {
+     return triggerPreDetectLastCycle;
+   }
+
+   // @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem.QueueProcessingStatistics#postCheckIsLastCycle(int)
+   @Override
+   public boolean postCheckIsLastCycle(int workFound) {
+     return triggerPostDetectLastCycle;
+   }
+
+   // @see org.apache.hadoop.util.QueueProcessingStatistics#logEndFirstCycle()
+   @Override
+   void logEndFirstCycle() {
+     assertTrue(expectedLogResult == ExpectedLogResult.EXPECT_END_FIRST_CYCLE
+         || expectedLogResult == ExpectedLogResult.EXPECT_END_SOLE_CYCLE);
+     super.logEndFirstCycle();
+   }
+
+   // @see org.apache.hadoop.util.QueueProcessingStatistics#logEndLastCycle()
+   @Override
+   void logEndLastCycle() {
+     assertTrue(expectedLogResult == ExpectedLogResult.EXPECT_END_LAST_CYCLE
+         || expectedLogResult == ExpectedLogResult.EXPECT_END_SOLE_CYCLE);
+     super.logEndLastCycle();
+   }
+
+   // @see org.apache.hadoop.util.QueueProcessingStatistics#logError(String)
+   @Override
+   void logError(String msg) {
+     assertEquals(ExpectedLogResult.EXPECT_ERROR, expectedLogResult);
+     super.logError(msg);
+   }
+
+   // @see org.apache.hadoop.util.QueueProcessingStatistics#logErrorWithStats(String)
+   @Override
+   void logErrorWithStats(String msg) {
+     assertEquals(ExpectedLogResult.EXPECT_ERROR_WITH_STATUS, expectedLogResult);
+     super.logErrorWithStats(msg);
+   }
+ }
+ 
+ /*
+  * Simulate doing a cycle of work
+  * @return - whatever was passed in
+  */
+ private int simulateWork(int work) throws InterruptedException {
+   Thread.sleep(DEFAULT_CYCLE_DURATION);
+   return work;
+ }
+ 
+ /*
+  * Simulate the inter-cycle delay by ... delaying!
+  */
+ private void simulateIntercycleDelay() throws InterruptedException {
+   Thread.sleep(DEFAULT_CYCLE_DELAY);
+   return;
+ }
+
+ @Test
+ public void testSingleCyclePreDetect() throws InterruptedException {
+   int workToDo = 8;
+   int maxWorkPerCycle = 10;
+   int workDone = 0;
+   
+   qStats.checkRestart();
+   assertExpectedValues(false, State.BEGIN_COLLECTING, 0, 0);
+   
+   qStats.triggerPreDetectLastCycle = true;
+   qStats.startCycle(workToDo);
+   assertExpectedValues(true, State.IN_SOLE_CYCLE, 0, 0);
+
+   workDone = simulateWork(workToDo);
+   qStats.expectedLogResult = ExpectedLogResult.EXPECT_END_SOLE_CYCLE;
+   qStats.endCycle(workDone);   
+   assertExpectedValues(false, State.DONE_COLLECTING, 8, 1);
+ }
+ 
+ @Test
+ public void testSingleCyclePostDetect() throws InterruptedException {
+   int workToDo = 8;
+   int maxWorkPerCycle = 10;
+   int workDone = 0;
+   
+   qStats.checkRestart();
+   assertExpectedValues(false, State.BEGIN_COLLECTING, 0, 0);
+   
+   qStats.startCycle(maxWorkPerCycle);
+   assertExpectedValues(true, State.IN_FIRST_CYCLE, 0, 0);
+
+   workDone = simulateWork(workToDo);
+   qStats.triggerPostDetectLastCycle = true;
+   qStats.expectedLogResult = ExpectedLogResult.EXPECT_END_SOLE_CYCLE;
+   qStats.endCycle(workDone);   
+   assertExpectedValues(false, State.DONE_COLLECTING, 8, 1);
+ }
+ 
+ @Test
+ public void testMultiCyclePreDetect() throws InterruptedException {
+   int workToDo = 28;
+   int maxWorkPerCycle = 10;
+   int workFound = 0;
+   int workDone = 0;
+   
+   qStats.checkRestart();
+   assertExpectedValues(false, State.BEGIN_COLLECTING, 0, 0);
+   
+   qStats.startCycle(maxWorkPerCycle);
+   assertExpectedValues(true, State.IN_FIRST_CYCLE, workDone, 0);
+   workFound = simulateWork(maxWorkPerCycle);
+   workDone += workFound;
+   workToDo -= workFound;
+   qStats.expectedLogResult = ExpectedLogResult.EXPECT_END_FIRST_CYCLE;
+   qStats.endCycle(workFound);   
+   assertExpectedValues(false, State.DONE_FIRST_CYCLE, 10, 1);
+   qStats.expectedLogResult = ExpectedLogResult.NONE;
+   simulateIntercycleDelay();
+   
+   qStats.startCycle(maxWorkPerCycle);
+   assertExpectedValues(true, State.DONE_FIRST_CYCLE, workDone, 1);
+   workFound = simulateWork(maxWorkPerCycle);
+   workDone += workFound;
+   workToDo -= workFound;
+   qStats.endCycle(workFound);   
+   assertExpectedValues(false, State.DONE_FIRST_CYCLE, 20, 2);
+   simulateIntercycleDelay();
+
+   qStats.triggerPreDetectLastCycle = true;
+   qStats.startCycle(maxWorkPerCycle);
+   assertExpectedValues(true, State.IN_LAST_CYCLE, workDone, 2);
+   workFound = simulateWork(workToDo);
+   workDone += workFound;
+   workToDo -= workFound;
+   qStats.expectedLogResult = ExpectedLogResult.EXPECT_END_LAST_CYCLE;
+   qStats.endCycle(workFound);   
+   assertExpectedValues(false, State.DONE_COLLECTING, 28, 3);
+}
+ 
+ @Test
+ public void testMultiCyclePostDetect() throws InterruptedException {
+   int workToDo = 28;
+   int maxWorkPerCycle = 10;
+   int workFound = 0;
+   int workDone = 0;
+   
+   qStats.checkRestart();
+   assertExpectedValues(false, State.BEGIN_COLLECTING, 0, 0);
+   
+   qStats.startCycle(maxWorkPerCycle);
+   assertExpectedValues(true, State.IN_FIRST_CYCLE, workDone, 0);
+   workFound = simulateWork(maxWorkPerCycle);
+   workDone += workFound;
+   workToDo -= workFound;
+   qStats.expectedLogResult = ExpectedLogResult.EXPECT_END_FIRST_CYCLE;
+   qStats.endCycle(workFound);   
+   assertExpectedValues(false, State.DONE_FIRST_CYCLE, 10, 1);
+   qStats.expectedLogResult = ExpectedLogResult.NONE;
+   simulateIntercycleDelay();
+
+   qStats.startCycle(maxWorkPerCycle);
+   assertExpectedValues(true, State.DONE_FIRST_CYCLE, workDone, 1);
+   workFound = simulateWork(maxWorkPerCycle);
+   workDone += workFound;
+   workToDo -= workFound;
+   qStats.endCycle(workFound);   
+   assertExpectedValues(false, State.DONE_FIRST_CYCLE, 20, 2);
+   simulateIntercycleDelay();
+
+   qStats.startCycle(maxWorkPerCycle);
+   assertExpectedValues(true, State.DONE_FIRST_CYCLE, workDone, 2);
+   workFound = simulateWork(workToDo);
+   workDone += workFound;
+   workToDo -= workFound;
+   qStats.triggerPostDetectLastCycle = true;
+   qStats.expectedLogResult = ExpectedLogResult.EXPECT_END_LAST_CYCLE;
+   qStats.endCycle(workFound);   
+   assertExpectedValues(false, State.DONE_COLLECTING, 28, 3);
+}
+ 
+ @Test
+ public void testRestartIncycle() throws InterruptedException {
+   int workToDo = 28;
+   int maxWorkPerCycle = 10;
+   int workFound = 0;
+   int workDone = 0;
+   
+   qStats.checkRestart();
+   assertExpectedValues(false, State.BEGIN_COLLECTING, 0, 0);
+   
+   qStats.startCycle(maxWorkPerCycle);
+   assertExpectedValues(true, State.IN_FIRST_CYCLE, workDone, 0);
+   workFound = simulateWork(maxWorkPerCycle);
+   workDone += workFound;
+   workToDo -= workFound;
+   qStats.expectedLogResult = ExpectedLogResult.EXPECT_END_FIRST_CYCLE;
+   qStats.endCycle(workFound);   
+   assertExpectedValues(false, State.DONE_FIRST_CYCLE, 10, 1);
+   
+   qStats.expectedLogResult = ExpectedLogResult.EXPECT_ERROR_WITH_STATUS;
+   qStats.checkRestart();
+   assertExpectedValues(false, State.BEGIN_COLLECTING, 0, 0);
+ }
+ 
+ @Test
+ public void testRestartAfter() throws InterruptedException {
+   testSingleCyclePostDetect();
+   qStats.expectedLogResult = ExpectedLogResult.EXPECT_ERROR;
+   qStats.checkRestart();
+   assertExpectedValues(false, State.BEGIN_COLLECTING, 0, 0);   
+ } 
+}