You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:53:25 UTC
svn commit: r1077769 - in
/hadoop/common/branches/branch-0.20-security-patches: ./
src/core/org/apache/hadoop/util/
src/hdfs/org/apache/hadoop/hdfs/server/datanode/
src/hdfs/org/apache/hadoop/hdfs/server/namenode/
src/test/org/apache/hadoop/util/
Author: omalley
Date: Fri Mar 4 04:53:25 2011
New Revision: 1077769
URL: http://svn.apache.org/viewvc?rev=1077769&view=rev
Log:
commit e0e776951c6e639727eefea408d873b0d87d2961
Author: Matt Foley <ma...@yahoo-inc.com>
Date: Thu Dec 16 16:55:49 2010 -0800
HDFS: BZ-4182948. Add statistics logging to Fred for better visibility into
startup time costs. (Matt Foley)
Added:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/QueueProcessingStatistics.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestQueueProcessingStatistics.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Modified: hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt?rev=1077769&r1=1077768&r2=1077769&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-patches/CHANGES.txt Fri Mar 4 04:53:25 2011
@@ -4,6 +4,9 @@ Release 0.20.3 - Unreleased
IMPROVEMENTS
+ BZ-4182948. Add statistics logging to Fred for better visibility into
+ startup time costs. (Matt Foley)
+
BUG FIXES
HDFS-955. New implementation of saveNamespace() to avoid loss of edits
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/QueueProcessingStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/QueueProcessingStatistics.java?rev=1077769&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/QueueProcessingStatistics.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/QueueProcessingStatistics.java Fri Mar 4 04:53:25 2011
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.commons.logging.Log;
+
+/**
+ * Hadoop has several work queues, such as
+ * {@link org.apache.hadoop.hdfs.server.namenode.FSNamesystem#neededReplications}
+ * With a properly throttled queue, a worker thread cycles repeatedly,
+ * doing a chunk of work each cycle then resting a bit, until the queue is
+ * empty. This class is intended to collect statistics about the behavior of
+ * such queues and consumers. It reports the amount of work done and
+ * how long it took, for the first cycle after collection starts, and for
+ * the total number of cycles needed to flush the queue. We use a state
+ * machine to detect when the queue has been flushed and then we log the
+ * stats; see {@link State} for enumeration of the states and their meanings.
+ */
+public abstract class QueueProcessingStatistics {
+ //All member variables and methods that would normally be access "private"
+ //are instead package-private so we can subclass for unit testing.
+ State state = State.BEGIN_COLLECTING;
+ long startTimeCurrentCycle;
+ long startTime;
+ long processDuration;
+ long clockDuration;
+ long workItemCount;
+ int cycleCount;
+
+ String queueName;
+ String workItemsName;
+ Log LOG;
+
+ /**
+ * This enum provides the "states" of a state machine for
+ * {@link QueueProcessingStatistics}.
+ * The meanings of the states are: <ul>
+ * <li> BEGIN_COLLECTING - Ready to begin.
+ * <li> IN_FIRST_CYCLE - Started the first cycle.
+ * <li> IN_SOLE_CYCLE - Still in first cycle, but already know there will be
+ * no further cycles because this one will complete all
+ * needed work. When done, will go straight to
+ * "DONE_COLLECTING".
+ * <li> DONE_FIRST_CYCLE - Done first cycle, doing subsequent cycles.
+ * <li> IN_LAST_CYCLE - Started the last cycle.
+ * <li> DONE_COLLECTING - Done with last cycle, finishing up.
+ * </ul>
+ */
+ public enum State {
+ BEGIN_COLLECTING,
+ IN_FIRST_CYCLE,
+ IN_SOLE_CYCLE,
+ DONE_FIRST_CYCLE,
+ IN_LAST_CYCLE,
+ DONE_COLLECTING,
+ }
+
+ /**
+ * @param queueName - Human-readable name of the queue being monitored,
+ * used as first word in the log messages.
+ * @param workItemsName - what kind of work items are being managed
+ * on the queue? A plural word is best here, for logging.
+ * @param logObject - What log do you want the log messages to be sent to?
+ */
+ public QueueProcessingStatistics(String queueName, String workItemsName,
+ Log logObject) {
+ this.queueName = queueName;
+ this.workItemsName = workItemsName;
+ this.LOG = logObject;
+ }
+
+ public void startCycle(int maxWorkToProcess) {
+ //only collect stats for one complete flush of the queue
+ if (state == State.DONE_COLLECTING) return;
+
+ //regardless of state, record the start of this cycle
+ startTimeCurrentCycle = now();
+ boolean preDetectLastCycle = preCheckIsLastCycle(maxWorkToProcess);
+
+ switch (state) {
+ case BEGIN_COLLECTING:
+ startTime = startTimeCurrentCycle;
+ state = preDetectLastCycle ? State.IN_SOLE_CYCLE : State.IN_FIRST_CYCLE;
+ break;
+ default:
+ if (preDetectLastCycle)
+ state = State.IN_LAST_CYCLE;
+ break;
+ }
+ }
+
+ public void endCycle(int workFound) {
+ //only collect stats for first pass through the queue
+ if (state == State.DONE_COLLECTING) return;
+
+ //regardless of state, record the end of this cycle
+ //and accumulate the cycle's stats
+ long endTimeCurrentCycle = now();
+ processDuration += endTimeCurrentCycle - startTimeCurrentCycle;
+ clockDuration = endTimeCurrentCycle - startTime;
+ workItemCount += workFound;
+ cycleCount++;
+ boolean postDetectLastCycle = postCheckIsLastCycle(workFound);
+
+ switch (state) {
+ case BEGIN_COLLECTING:
+ logError("endCycle() called before startCycle(), "
+ + "exiting stats collection");
+ state = State.DONE_COLLECTING;
+ break;
+ case IN_FIRST_CYCLE:
+ if (postDetectLastCycle) {
+ state = State.IN_SOLE_CYCLE;
+ //and fall through
+ } else {
+ logEndFirstCycle();
+ state = State.DONE_FIRST_CYCLE;
+ break;
+ }
+ case IN_SOLE_CYCLE:
+ logEndFirstCycle();
+ logEndLastCycle();
+ state = State.DONE_COLLECTING;
+ break;
+ case DONE_FIRST_CYCLE:
+ if (postDetectLastCycle) {
+ state = State.IN_LAST_CYCLE;
+ //and fall through
+ } else {
+ break;
+ }
+ case IN_LAST_CYCLE:
+ logEndLastCycle();
+ state = State.DONE_COLLECTING;
+ break;
+ default:
+ logError("unallowed state reached, exiting stats collection");
+ state = State.DONE_COLLECTING;
+ break;
+ }
+ }
+
+ public void checkRestart() {
+ switch (state) {
+ case BEGIN_COLLECTING:
+ //situation normal
+ return;
+ case DONE_COLLECTING:
+ logError("Restarted stats collection after completion of first "
+ + "queue flush.");
+ initialize();
+ break;
+ default:
+ //for all other cases, we are in the middle of stats collection,
+ //so output the stats collected so far before re-initializing
+ logErrorWithStats("Restarted stats collection before completion of "
+ + "first queue flush.");
+ initialize();
+ break;
+ }
+ }
+
+ void initialize() {
+ state = State.BEGIN_COLLECTING;
+ startTimeCurrentCycle = 0;
+ startTime = 0;
+ processDuration = 0;
+ clockDuration = 0;
+ workItemCount = 0;
+ cycleCount = 0;
+ }
+
+ /**
+ * The termination condition is to identify the last cycle that will
+ * empty the queue. Two abstract APIs are called: {@code preCheckIsLastCycle}
+ * is called at the beginning of each cycle, and
+ * {@link #postCheckIsLastCycle} is called at the end of each cycle.
+ * At least one of them must correctly provide the termination
+ * condition. The other may always return 'false'. If either of them
+ * returns 'true' in a given cycle, then at the end of that cycle the
+ * stats will be output to log, and stats collection will end.
+ *
+ * @param maxWorkToProcess - if this number is greater than the amount
+ * of work remaining at the start of a cycle, then it will
+ * be the last cycle.
+ * @return - true if last cycle detected, else false
+ */
+ public abstract boolean preCheckIsLastCycle(int maxWorkToProcess);
+
+ /**
+ * See {@link #preCheckIsLastCycle}.
+ * @param workFound - may not be useful
+ * @return - true if remaining work is zero at end of cycle,
+ * else false
+ */
+ public abstract boolean postCheckIsLastCycle(int workFound);
+
+ String msgEndFirstCycle() {
+ return queueName + " QueueProcessingStatistics: First cycle completed "
+ + workItemCount + " " + workItemsName + " in " + processDuration
+ + " msec";
+ }
+
+ void logEndFirstCycle() {
+ LOG.info(msgEndFirstCycle());
+ }
+
+ String msgEndLastCycle() {
+ return queueName
+ + " QueueProcessingStatistics: Queue flush completed "
+ + workItemCount + " " + workItemsName + " in "
+ + processDuration + " msec processing time, "
+ + clockDuration + " msec clock time, "
+ + cycleCount + " cycles";
+ }
+
+ void logEndLastCycle() {
+ LOG.info(msgEndLastCycle());
+ }
+
+ String msgError(String msg) {
+ return queueName
+ + " QueueProcessingStatistics - Error: " + msg;
+ }
+
+ void logError(String msg) {
+ LOG.error(msgError(msg));
+ }
+
+ String msgErrorWithStats(String msg) {
+ return queueName
+ + " QueueProcessingStatistics - Error: " + msg
+ + " Completed " + workItemCount + " " + workItemsName + " in "
+ + processDuration + " msec processing time, "
+ + clockDuration + " msec clock time, "
+ + cycleCount + " cycles";
+ }
+
+ void logErrorWithStats(String msg) {
+ LOG.error(msgErrorWithStats(msg));
+ }
+
+ /**
+ * Current system time.
+ * @return current time in msec.
+ */
+ static long now() {
+ return System.currentTimeMillis();
+ }
+
+}
+
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1077769&r1=1077768&r2=1077769&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Mar 4 04:53:25 2011
@@ -893,21 +893,26 @@ public class DataNode extends Configured
}
}
- // send block report
+ // Send latest blockinfo report if timer has expired.
if (startTime - lastBlockReport > blockReportInterval) {
- //
- // Send latest blockinfo report if timer has expired.
- // Get back a list of local block(s) that are obsolete
- // and can be safely GC'ed.
- //
- long brStartTime = now();
+
+ // Create block report
+ long brCreateStartTime = now();
Block[] bReport = data.getBlockReport();
+
+ // Send block report
+ long brSendStartTime = now();
DatanodeCommand cmd = namenode.blockReport(dnRegistration,
BlockListAsLongs.convertToArrayLongs(bReport));
- long brTime = now() - brStartTime;
- myMetrics.addBlockReport(brTime);
- LOG.info("BlockReport of " + bReport.length +
- " blocks got processed in " + brTime + " msecs");
+
+ // Log the block report processing stats from Datanode perspective
+ long brSendCost = now() - brSendStartTime;
+ long brCreateCost = brSendStartTime - brCreateStartTime;
+ myMetrics.addBlockReport(brSendCost);
+ LOG.info("BlockReport of " + bReport.length
+ + " blocks took " + brCreateCost + " msec to generate and "
+ + brSendCost + " msecs for RPC and NN processing");
+
//
// If we have sent the first block report, then wait a random
// time before we start the periodic block reports.
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1077769&r1=1077768&r2=1077769&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Mar 4 04:53:25 2011
@@ -115,6 +115,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.QueueProcessingStatistics;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
@@ -264,12 +265,12 @@ public class FSNamesystem implements FSC
*/
ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
- //
- // Store set of Blocks that need to be replicated 1 or more times.
- // We also store pending replication-orders.
- // Set of: Block
- //
+/**
+ * Store set of Blocks that need to be replicated 1 or more times.
+ * Set of: Block
+ */
private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
+ // We also store pending replication-orders.
private PendingReplicationBlocks pendingReplications;
public LeaseManager leaseManager = new LeaseManager(this);
@@ -282,6 +283,7 @@ public class FSNamesystem implements FSC
public Daemon lmthread = null; // LeaseMonitor thread
Daemon smmthread = null; // SafeModeMonitor thread
public Daemon replthread = null; // Replication thread
+ private ReplicationMonitor replmon = null; // Replication metrics
private volatile boolean fsRunning = true;
long systemStart = 0;
@@ -390,7 +392,8 @@ public class FSNamesystem implements FSC
}
this.hbthread = new Daemon(new HeartbeatMonitor());
this.lmthread = new Daemon(leaseManager.new Monitor());
- this.replthread = new Daemon(new ReplicationMonitor());
+ this.replmon = new ReplicationMonitor();
+ this.replthread = new Daemon(replmon);
hbthread.start();
lmthread.start();
replthread.start();
@@ -561,6 +564,7 @@ public class FSNamesystem implements FSC
if (pendingReplications != null) pendingReplications.stop();
if (hbthread != null) hbthread.interrupt();
if (replthread != null) replthread.interrupt();
+ if (replmon != null) replmon = null;
if (dnthread != null) dnthread.interrupt();
if (smmthread != null) smmthread.interrupt();
if (dtSecretManager != null) dtSecretManager.stopThreads();
@@ -2499,6 +2503,11 @@ public class FSNamesystem implements FSC
class ReplicationMonitor implements Runnable {
static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
+ ReplicateQueueProcessingStats replicateQueueStats =
+ new ReplicateQueueProcessingStats();
+ InvalidateQueueProcessingStats invalidateQueueStats =
+ new InvalidateQueueProcessingStats();
+
public void run() {
while (fsRunning) {
try {
@@ -2516,8 +2525,61 @@ public class FSNamesystem implements FSC
}
}
}
- }
+ /**
+ * Just before exiting safe mode, {@link SafeModeInfo.leave()} scans all
+ * blocks and identifies under-replicated and invalid blocks. These are
+ * placed in work queues. Immediately after leaving safe mode,
+ * {@link ReplicationMonitor} starts processing these queues via calls to
+ * {@link #computeDatanodeWork()}. Each call does a chunk of work, then
+ * waits 3 seconds before doing the next chunk of work, to avoid monopolizing
+ * the CPUs and the global lock. It may take several cycles before the
+ * queue is completely flushed.
+ *
+ * Here we use two concrete subclasses of {@link QueueProcessingStatistics}
+ * to collect stats about these processes.
+ */
+ private class ReplicateQueueProcessingStats
+ extends QueueProcessingStatistics {
+
+ ReplicateQueueProcessingStats() {
+ super("ReplicateQueue", "blocks", FSNamesystem.LOG);
+ }
+
+ // @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem.QueueProcessingStatistics#preCheckIsLastCycle(int)
+ @Override
+ public boolean preCheckIsLastCycle(int maxWorkToProcess) {
+ return (maxWorkToProcess >= neededReplications.size());
+ }
+
+ // @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem.QueueProcessingStatistics#postCheckIsLastCycle(int)
+ @Override
+ public boolean postCheckIsLastCycle(int workFound) {
+ return false;
+ }
+ }
+
+ private class InvalidateQueueProcessingStats
+ extends QueueProcessingStatistics {
+
+ InvalidateQueueProcessingStats() {
+ super("InvalidateQueue", "blocks", FSNamesystem.LOG);
+ }
+
+ // @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem.QueueProcessingStatistics#preCheckIsLastCycle(int)
+ @Override
+ public boolean preCheckIsLastCycle(int maxWorkToProcess) {
+ return false;
+ }
+
+ // @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem.QueueProcessingStatistics#postCheckIsLastCycle(int)
+ @Override
+ public boolean postCheckIsLastCycle(int workFound) {
+ return recentInvalidateSets.isEmpty();
+ }
+ }
+ }
+
/////////////////////////////////////////////////////////
//
// These methods are called by the Namenode system, to see
@@ -2532,12 +2594,17 @@ public class FSNamesystem implements FSC
* @return number of blocks scheduled for replication or removal.
*/
public int computeDatanodeWork() throws IOException {
- int workFound = 0;
+ int replicationWorkFound = 0;
+ int invalidationWorkFound = 0;
int blocksToProcess = 0;
int nodesToProcess = 0;
+
// blocks should not be replicated or removed if safe mode is on
- if (isInSafeMode())
- return workFound;
+ if (isInSafeMode()) {
+ replmon.replicateQueueStats.checkRestart();
+ replmon.invalidateQueueStats.checkRestart();
+ return 0;
+ }
synchronized(heartbeats) {
blocksToProcess = (int)(heartbeats.size()
* ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
@@ -2545,18 +2612,23 @@ public class FSNamesystem implements FSC
* ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
}
- workFound = computeReplicationWork(blocksToProcess);
+ replmon.replicateQueueStats.startCycle(blocksToProcess);
+ replicationWorkFound = computeReplicationWork(blocksToProcess);
+ replmon.replicateQueueStats.endCycle(replicationWorkFound);
// Update FSNamesystemMetrics counters
synchronized (this) {
pendingReplicationBlocksCount = pendingReplications.size();
underReplicatedBlocksCount = neededReplications.size();
- scheduledReplicationBlocksCount = workFound;
+ scheduledReplicationBlocksCount = replicationWorkFound;
corruptReplicaBlocksCount = corruptReplicas.size();
}
- workFound += computeInvalidateWork(nodesToProcess);
- return workFound;
+ replmon.invalidateQueueStats.startCycle(nodesToProcess);
+ invalidationWorkFound = computeInvalidateWork(nodesToProcess);
+ replmon.invalidateQueueStats.endCycle(invalidationWorkFound);
+
+ return replicationWorkFound + invalidationWorkFound;
}
private int computeInvalidateWork(int nodesToProcess) {
@@ -3102,7 +3174,11 @@ public class FSNamesystem implements FSC
+ " does not belong to any file.");
addToInvalidates(b, node);
}
- NameNode.getNameNodeMetrics().addBlockReport(now() - startTime);
+ long endTime = now();
+ NameNode.getNameNodeMetrics().addBlockReport(endTime - startTime);
+ NameNode.stateChangeLog.info("*BLOCK* NameSystem.processReport: from "
+ + nodeID.getName() + ", blocks: " + newReport.getNumberOfBlocks()
+ + ", processing time: " + (endTime - startTime) + " msecs");
}
/**
@@ -4301,7 +4377,13 @@ public class FSNamesystem implements FSC
}
}
// verify blocks replications
+ long startTimeMisReplicatedScan = now();
processMisReplicatedBlocks();
+ NameNode.stateChangeLog.info("STATE* Safe mode termination "
+ + "scan for invalid, over- and under-replicated blocks "
+ + "completed in " + (now() - startTimeMisReplicatedScan)
+ + " msec");
+
long timeInSafemode = now() - systemStart;
NameNode.stateChangeLog.info("STATE* Leaving safe mode after "
+ timeInSafemode/1000 + " secs.");
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestQueueProcessingStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestQueueProcessingStatistics.java?rev=1077769&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestQueueProcessingStatistics.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestQueueProcessingStatistics.java Fri Mar 4 04:53:25 2011
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.QueueProcessingStatistics.State;
+
+import org.junit.Before;
+import org.junit.Test;
+
+
+/**
+ * Standalone unit tests for QueueProcessingStatistics
+ */
+public class TestQueueProcessingStatistics extends junit.framework.TestCase {
+
+ public static final Log testLog = LogFactory.getLog(
+ TestQueueProcessingStatistics.class.getName() + ".testLog");
+
+ private UnitQueueProcessingStats qStats = new UnitQueueProcessingStats();
+
+ public enum ExpectedLogResult {NONE, EXPECT_END_FIRST_CYCLE,
+ EXPECT_END_SOLE_CYCLE, EXPECT_END_LAST_CYCLE,
+ EXPECT_ERROR, EXPECT_ERROR_WITH_STATUS,}
+
+ private static final long DEFAULT_CYCLE_DURATION = 25;
+ private static final long DEFAULT_CYCLE_DELAY = 10;
+
+ //minimum expected duration of each cycle must be >= 0 msec,
+ //ignored if zero.
+ private long cycleDuration = DEFAULT_CYCLE_DURATION;
+ //minimum expected rest time after each cycle must be >= 0 msec,
+ //ignored if zero.
+ private long cycleDelay = DEFAULT_CYCLE_DELAY;
+
+ @Before
+ public void initialize() {
+ qStats.initialize();
+ cycleDuration = DEFAULT_CYCLE_DURATION;
+ cycleDelay = DEFAULT_CYCLE_DELAY;
+ assertExpectedValues(false, State.BEGIN_COLLECTING, 0, 0);
+ }
+
+ /**
+ * Does a set of asserts, appropriate to the current state of qStats.
+ *
+ * Note that any or all of cycleDuration, cycleDelay, and workItemCount
+ * may be zero, which restricts the assertions we can make about the growth
+ * of the qStats values as cycleCount increases.
+ *
+ * @param inCycle - true if believed to be currently in the middle of a cycle,
+ * false if believed to have ended the most recent cycle.
+ * @param state - expected state (ignored if null)
+ * @param workItemCount - expected count (ignored if null)
+ * @param cycleCount - expected count (ignored if null)
+ */
+ public void assertExpectedValues(
+ boolean inCycle, QueueProcessingStatistics.State state,
+ Integer workItemCount, Integer cycleCount) {
+ //Check implicit arguments
+ assertTrue(cycleDuration >= 0L);
+ assertTrue(cycleDelay >= 0L);
+
+ //Asserts based on expected values
+ if (state != null)
+ assertEquals(failMsg(), state, qStats.state);
+ if (workItemCount != null)
+ assertEquals(failMsg(), (long) workItemCount, qStats.workItemCount);
+ if (cycleCount != null)
+ assertEquals(failMsg(), (int) cycleCount, qStats.cycleCount);
+
+ //Asserts based on general principles
+ assertTrue(failMsg(), qStats.startTime >= 0);
+ if (qStats.state != State.BEGIN_COLLECTING) {
+ assertAlmostEquals(failMsg() + " inCycle=" + inCycle,
+ qStats.startTimeCurrentCycle,
+ qStats.startTime
+ + (cycleDuration + cycleDelay)
+ * (qStats.cycleCount - (inCycle ? 0 : 1)));
+ assertAlmostEquals(failMsg(),
+ qStats.processDuration, cycleDuration * qStats.cycleCount);
+ assertAlmostEquals(failMsg(),
+ qStats.clockDuration,
+ qStats.processDuration
+ + cycleDelay * (qStats.cycleCount - (qStats.cycleCount > 0 ? 1 : 0)));
+ }
+ assertTrue(failMsg(), qStats.workItemCount >= 0);
+ assertTrue(failMsg(), qStats.cycleCount >= 0);
+
+ //Asserts based on state machine State.
+ switch (qStats.state) {
+ case BEGIN_COLLECTING:
+ assertFalse(failMsg(), inCycle);
+ assertEquals(failMsg(), 0, qStats.startTime);
+ assertEquals(failMsg(), 0, qStats.startTimeCurrentCycle);
+ assertEquals(failMsg(), 0, qStats.processDuration);
+ assertEquals(failMsg(), 0, qStats.clockDuration);
+ assertEquals(failMsg(), 0, qStats.workItemCount);
+ assertEquals(failMsg(), 0, qStats.cycleCount);
+ break;
+ case IN_FIRST_CYCLE:
+ case IN_SOLE_CYCLE:
+ assertTrue(failMsg(), inCycle);
+ assertTrue(failMsg(), qStats.startTime > 0);
+ assertEquals(failMsg(), qStats.startTime, qStats.startTimeCurrentCycle);
+ assertEquals(failMsg(), 0, qStats.processDuration);
+ assertEquals(failMsg(), 0, qStats.clockDuration);
+ assertEquals(failMsg(), 0, qStats.workItemCount);
+ assertEquals(failMsg(), 0, qStats.cycleCount);
+ break;
+ case DONE_FIRST_CYCLE:
+ //Can't make any assertions about "inCycle".
+ //For other qStats values, the general principles are the strongest
+ //assertions that can be made.
+ assertTrue(failMsg(), qStats.startTime > 0);
+ assertTrue(failMsg(), qStats.cycleCount > 0);
+ break;
+ case IN_LAST_CYCLE:
+ assertTrue(failMsg(), inCycle);
+ assertTrue(failMsg(), qStats.startTime > 0);
+ assertTrue(failMsg(), qStats.cycleCount > 0);
+ break;
+ case DONE_COLLECTING:
+ assertFalse(failMsg(), inCycle);
+ assertTrue(failMsg(), qStats.startTime > 0);
+ assertTrue(failMsg(), qStats.cycleCount > 0);
+ break;
+ default:
+ fail(failMsg() + " Reached unallowed state");
+ break;
+ }
+ }
+
+ private String failMsg() {
+ return "State=" + qStats.state + " cycleCount=" + qStats.cycleCount;
+ }
+
+ /**
+ * The cycleDuration and cycleDelay are simulated by sleeping for those
+ * numbers of milliseconds. Since sleep() is inexact, we'll assume it may
+ * vary by +/- 1 msec per call. Since there are two calls per
+ * virtual cycle, the potential error is twice that. And we add 1 to
+ * account for sheer paranoia and the possibility of two consecutive
+ * calls to "now()" occurring across a tick boundary.
+ * We'll use values > 10 for cycleDuration and cycleDelay,
+ * so the error isn't huge, percentage-wise.
+ *
+ * @return - whether the difference between inputs a and b is within
+ * that expected error
+ */
+ private boolean almostEquals(long a, long b) {
+ long diff = a - b;
+ if (diff < 0) diff = -diff;
+ return diff < 2 * (qStats.cycleCount + 1);
+ }
+
+ private void assertAlmostEquals(long a, long b) {
+ if (!almostEquals(a, b))
+ fail("Failed almostEquals test: " + a + ", " + b);
+ }
+
+ private void assertAlmostEquals(String msg, long a, long b) {
+ if (!almostEquals(a, b))
+ fail(msg + "; Failed almostEquals test: " + a + ", " + b);
+ }
+
+ /*
+ * Concrete subclasses of {@link QueueProcessingStatistics} for unit test
+ */
+ private class UnitQueueProcessingStats
+ extends QueueProcessingStatistics {
+ public boolean triggerPreDetectLastCycle = false;
+ public boolean triggerPostDetectLastCycle = false;
+ public ExpectedLogResult expectedLogResult = ExpectedLogResult.NONE;
+
+ UnitQueueProcessingStats() {
+ super("UnitTestQueue", "blocks", testLog);
+ }
+
+ @Override
+ void initialize() {
+ super.initialize();
+ triggerPreDetectLastCycle = false;
+ triggerPostDetectLastCycle = false;
+ expectedLogResult = ExpectedLogResult.NONE;
+ }
+
+ // @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem.QueueProcessingStatistics#preCheckIsLastCycle(int)
+ @Override
+ public boolean preCheckIsLastCycle(int maxWorkToProcess) {
+ return triggerPreDetectLastCycle;
+ }
+
+ // @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem.QueueProcessingStatistics#postCheckIsLastCycle(int)
+ @Override
+ public boolean postCheckIsLastCycle(int workFound) {
+ return triggerPostDetectLastCycle;
+ }
+
+ // @see org.apache.hadoop.util.QueueProcessingStatistics#logEndFirstCycle()
+ @Override
+ void logEndFirstCycle() {
+ assertTrue(expectedLogResult == ExpectedLogResult.EXPECT_END_FIRST_CYCLE
+ || expectedLogResult == ExpectedLogResult.EXPECT_END_SOLE_CYCLE);
+ super.logEndFirstCycle();
+ }
+
+ // @see org.apache.hadoop.util.QueueProcessingStatistics#logEndLastCycle()
+ @Override
+ void logEndLastCycle() {
+ assertTrue(expectedLogResult == ExpectedLogResult.EXPECT_END_LAST_CYCLE
+ || expectedLogResult == ExpectedLogResult.EXPECT_END_SOLE_CYCLE);
+ super.logEndLastCycle();
+ }
+
+ // @see org.apache.hadoop.util.QueueProcessingStatistics#logError(String)
+ @Override
+ void logError(String msg) {
+ assertEquals(ExpectedLogResult.EXPECT_ERROR, expectedLogResult);
+ super.logError(msg);
+ }
+
+ // @see org.apache.hadoop.util.QueueProcessingStatistics#logErrorWithStats(String)
+ @Override
+ void logErrorWithStats(String msg) {
+ assertEquals(ExpectedLogResult.EXPECT_ERROR_WITH_STATUS, expectedLogResult);
+ super.logErrorWithStats(msg);
+ }
+ }
+
+ /*
+ * Simulate doing a cycle of work
+ * @return - whatever was passed in
+ */
+ private int simulateWork(int work) throws InterruptedException {
+ Thread.sleep(DEFAULT_CYCLE_DURATION);
+ return work;
+ }
+
+ /*
+ * Simulate the inter-cycle delay by ... delaying!
+ */
+ private void simulateIntercycleDelay() throws InterruptedException {
+ Thread.sleep(DEFAULT_CYCLE_DELAY);
+ return;
+ }
+
+ @Test
+ public void testSingleCyclePreDetect() throws InterruptedException {
+ int workToDo = 8;
+ int maxWorkPerCycle = 10;
+ int workDone = 0;
+
+ qStats.checkRestart();
+ assertExpectedValues(false, State.BEGIN_COLLECTING, 0, 0);
+
+ qStats.triggerPreDetectLastCycle = true;
+ qStats.startCycle(workToDo);
+ assertExpectedValues(true, State.IN_SOLE_CYCLE, 0, 0);
+
+ workDone = simulateWork(workToDo);
+ qStats.expectedLogResult = ExpectedLogResult.EXPECT_END_SOLE_CYCLE;
+ qStats.endCycle(workDone);
+ assertExpectedValues(false, State.DONE_COLLECTING, 8, 1);
+ }
+
+ @Test
+ public void testSingleCyclePostDetect() throws InterruptedException {
+ int workToDo = 8;
+ int maxWorkPerCycle = 10;
+ int workDone = 0;
+
+ qStats.checkRestart();
+ assertExpectedValues(false, State.BEGIN_COLLECTING, 0, 0);
+
+ qStats.startCycle(maxWorkPerCycle);
+ assertExpectedValues(true, State.IN_FIRST_CYCLE, 0, 0);
+
+ workDone = simulateWork(workToDo);
+ qStats.triggerPostDetectLastCycle = true;
+ qStats.expectedLogResult = ExpectedLogResult.EXPECT_END_SOLE_CYCLE;
+ qStats.endCycle(workDone);
+ assertExpectedValues(false, State.DONE_COLLECTING, 8, 1);
+ }
+
+ @Test
+ public void testMultiCyclePreDetect() throws InterruptedException {
+ int workToDo = 28;
+ int maxWorkPerCycle = 10;
+ int workFound = 0;
+ int workDone = 0;
+
+ qStats.checkRestart();
+ assertExpectedValues(false, State.BEGIN_COLLECTING, 0, 0);
+
+ qStats.startCycle(maxWorkPerCycle);
+ assertExpectedValues(true, State.IN_FIRST_CYCLE, workDone, 0);
+ workFound = simulateWork(maxWorkPerCycle);
+ workDone += workFound;
+ workToDo -= workFound;
+ qStats.expectedLogResult = ExpectedLogResult.EXPECT_END_FIRST_CYCLE;
+ qStats.endCycle(workFound);
+ assertExpectedValues(false, State.DONE_FIRST_CYCLE, 10, 1);
+ qStats.expectedLogResult = ExpectedLogResult.NONE;
+ simulateIntercycleDelay();
+
+ qStats.startCycle(maxWorkPerCycle);
+ assertExpectedValues(true, State.DONE_FIRST_CYCLE, workDone, 1);
+ workFound = simulateWork(maxWorkPerCycle);
+ workDone += workFound;
+ workToDo -= workFound;
+ qStats.endCycle(workFound);
+ assertExpectedValues(false, State.DONE_FIRST_CYCLE, 20, 2);
+ simulateIntercycleDelay();
+
+ qStats.triggerPreDetectLastCycle = true;
+ qStats.startCycle(maxWorkPerCycle);
+ assertExpectedValues(true, State.IN_LAST_CYCLE, workDone, 2);
+ workFound = simulateWork(workToDo);
+ workDone += workFound;
+ workToDo -= workFound;
+ qStats.expectedLogResult = ExpectedLogResult.EXPECT_END_LAST_CYCLE;
+ qStats.endCycle(workFound);
+ assertExpectedValues(false, State.DONE_COLLECTING, 28, 3);
+}
+
+ @Test
+ public void testMultiCyclePostDetect() throws InterruptedException {
+ int workToDo = 28;
+ int maxWorkPerCycle = 10;
+ int workFound = 0;
+ int workDone = 0;
+
+ qStats.checkRestart();
+ assertExpectedValues(false, State.BEGIN_COLLECTING, 0, 0);
+
+ qStats.startCycle(maxWorkPerCycle);
+ assertExpectedValues(true, State.IN_FIRST_CYCLE, workDone, 0);
+ workFound = simulateWork(maxWorkPerCycle);
+ workDone += workFound;
+ workToDo -= workFound;
+ qStats.expectedLogResult = ExpectedLogResult.EXPECT_END_FIRST_CYCLE;
+ qStats.endCycle(workFound);
+ assertExpectedValues(false, State.DONE_FIRST_CYCLE, 10, 1);
+ qStats.expectedLogResult = ExpectedLogResult.NONE;
+ simulateIntercycleDelay();
+
+ qStats.startCycle(maxWorkPerCycle);
+ assertExpectedValues(true, State.DONE_FIRST_CYCLE, workDone, 1);
+ workFound = simulateWork(maxWorkPerCycle);
+ workDone += workFound;
+ workToDo -= workFound;
+ qStats.endCycle(workFound);
+ assertExpectedValues(false, State.DONE_FIRST_CYCLE, 20, 2);
+ simulateIntercycleDelay();
+
+ qStats.startCycle(maxWorkPerCycle);
+ assertExpectedValues(true, State.DONE_FIRST_CYCLE, workDone, 2);
+ workFound = simulateWork(workToDo);
+ workDone += workFound;
+ workToDo -= workFound;
+ qStats.triggerPostDetectLastCycle = true;
+ qStats.expectedLogResult = ExpectedLogResult.EXPECT_END_LAST_CYCLE;
+ qStats.endCycle(workFound);
+ assertExpectedValues(false, State.DONE_COLLECTING, 28, 3);
+}
+
+ @Test
+ public void testRestartIncycle() throws InterruptedException {
+ int workToDo = 28;
+ int maxWorkPerCycle = 10;
+ int workFound = 0;
+ int workDone = 0;
+
+ qStats.checkRestart();
+ assertExpectedValues(false, State.BEGIN_COLLECTING, 0, 0);
+
+ qStats.startCycle(maxWorkPerCycle);
+ assertExpectedValues(true, State.IN_FIRST_CYCLE, workDone, 0);
+ workFound = simulateWork(maxWorkPerCycle);
+ workDone += workFound;
+ workToDo -= workFound;
+ qStats.expectedLogResult = ExpectedLogResult.EXPECT_END_FIRST_CYCLE;
+ qStats.endCycle(workFound);
+ assertExpectedValues(false, State.DONE_FIRST_CYCLE, 10, 1);
+
+ qStats.expectedLogResult = ExpectedLogResult.EXPECT_ERROR_WITH_STATUS;
+ qStats.checkRestart();
+ assertExpectedValues(false, State.BEGIN_COLLECTING, 0, 0);
+ }
+
+ @Test
+ public void testRestartAfter() throws InterruptedException {
+ testSingleCyclePostDetect();
+ qStats.expectedLogResult = ExpectedLogResult.EXPECT_ERROR;
+ qStats.checkRestart();
+ assertExpectedValues(false, State.BEGIN_COLLECTING, 0, 0);
+ }
+}