You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2008/11/25 19:05:12 UTC
svn commit: r720555 - in /hadoop/core/branches/branch-0.18: ./ conf/
src/core/org/apache/hadoop/util/ src/hdfs/org/apache/hadoop/dfs/
src/test/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/util/
Author: szetszwo
Date: Tue Nov 25 10:05:11 2008
New Revision: 720555
URL: http://svn.apache.org/viewvc?rev=720555&view=rev
Log:
HADOOP-4061. Throttle Datanode decommission monitoring in Namenode. (szetszwo)
Added:
hadoop/core/branches/branch-0.18/src/core/org/apache/hadoop/util/CyclicIteration.java
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DecommissionManager.java
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/util/TestCyclicIteration.java
Modified:
hadoop/core/branches/branch-0.18/CHANGES.txt
hadoop/core/branches/branch-0.18/conf/hadoop-default.xml
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/LeaseManager.java
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestDecommission.java
Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=720555&r1=720554&r2=720555&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Tue Nov 25 10:05:11 2008
@@ -37,6 +37,9 @@
HADOOP-4616. Fuse-dfs can handle bad values from FileSystem.read call.
(Pete Wyckoff via dhruba)
+ HADOOP-4061. Throttle Datanode decommission monitoring in Namenode.
+ (szetszwo)
+
Release 0.18.2 - 2008-11-03
BUG FIXES
Modified: hadoop/core/branches/branch-0.18/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/conf/hadoop-default.xml?rev=720555&r1=720554&r2=720555&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/conf/hadoop-default.xml (original)
+++ hadoop/core/branches/branch-0.18/conf/hadoop-default.xml Tue Nov 25 10:05:11 2008
@@ -544,11 +544,18 @@
<property>
<name>dfs.namenode.decommission.interval</name>
- <value>300</value>
+ <value>30</value>
<description>Namenode periodicity in seconds to check if decommission is complete.</description>
</property>
<property>
+ <name>dfs.namenode.decommission.nodes.per.interval</name>
+ <value>5</value>
+ <description>The number of nodes namenode checks if decommission is complete
+ in each dfs.namenode.decommission.interval.</description>
+</property>
+
+<property>
<name>dfs.replication.interval</name>
<value>3</value>
<description>The periodicity in seconds with which the namenode computes repliaction work for datanodes. </description>
Added: hadoop/core/branches/branch-0.18/src/core/org/apache/hadoop/util/CyclicIteration.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/core/org/apache/hadoop/util/CyclicIteration.java?rev=720555&view=auto
==============================================================================
--- hadoop/core/branches/branch-0.18/src/core/org/apache/hadoop/util/CyclicIteration.java (added)
+++ hadoop/core/branches/branch-0.18/src/core/org/apache/hadoop/util/CyclicIteration.java Tue Nov 25 10:05:11 2008
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NoSuchElementException;
+
+/** Provide an cyclic {@link Iterator} for a {@link NavigableMap}.
+ * The {@link Iterator} navigates the entries of the map
+ * according to the map's ordering.
+ * If the {@link Iterator} hits the last entry of the map,
+ * it will then continue from the first entry.
+ */
+public class CyclicIteration<K, V> implements Iterable<Map.Entry<K, V>> {
+ private final NavigableMap<K, V> navigablemap;
+ private final NavigableMap<K, V> tailmap;
+
+ /** Construct an {@link Iterable} object,
+ * so that an {@link Iterator} can be created
+ * for iterating the given {@link NavigableMap}.
+ * The iteration begins from the starting key exclusively.
+ */
+ public CyclicIteration(NavigableMap<K, V> navigablemap, K startingkey) {
+ if (navigablemap == null || navigablemap.isEmpty()) {
+ this.navigablemap = null;
+ this.tailmap = null;
+ }
+ else {
+ this.navigablemap = navigablemap;
+ this.tailmap = navigablemap.tailMap(startingkey, false);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public Iterator<Map.Entry<K, V>> iterator() {
+ return new CyclicIterator();
+ }
+
+ /** An {@link Iterator} for {@link CyclicIteration}. */
+ private class CyclicIterator implements Iterator<Map.Entry<K, V>> {
+ private boolean hasnext;
+ private Iterator<Map.Entry<K, V>> i;
+ /** The first entry to begin. */
+ private final Map.Entry<K, V> first;
+ /** The next entry. */
+ private Map.Entry<K, V> next;
+
+ private CyclicIterator() {
+ hasnext = navigablemap != null;
+ if (hasnext) {
+ i = tailmap.entrySet().iterator();
+ first = nextEntry();
+ next = first;
+ }
+ else {
+ i = null;
+ first = null;
+ next = null;
+ }
+ }
+
+ private Map.Entry<K, V> nextEntry() {
+ if (!i.hasNext()) {
+ i = navigablemap.entrySet().iterator();
+ }
+ return i.next();
+ }
+
+ /** {@inheritDoc} */
+ public boolean hasNext() {
+ return hasnext;
+ }
+
+ /** {@inheritDoc} */
+ public Map.Entry<K, V> next() {
+ if (!hasnext) {
+ throw new NoSuchElementException();
+ }
+
+ final Map.Entry<K, V> curr = next;
+ next = nextEntry();
+ hasnext = !next.equals(first);
+ return curr;
+ }
+
+ /** Not supported */
+ public void remove() {
+ throw new UnsupportedOperationException("Not supported");
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DecommissionManager.java?rev=720555&view=auto
==============================================================================
--- hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DecommissionManager.java (added)
+++ hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DecommissionManager.java Tue Nov 25 10:05:11 2008
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.CyclicIteration;
+
+/**
+ * Manage node decommissioning.
+ */
+class DecommissionManager {
+ static final Log LOG = LogFactory.getLog(DecommissionManager.class);
+
+ private final FSNamesystem fsnamesystem;
+
+ DecommissionManager(FSNamesystem namesystem) {
+ this.fsnamesystem = namesystem;
+ }
+
+ /** Periodically check decommission status. */
+ class Monitor implements Runnable {
+ /** recheckInterval is how often namenode checks
+ * if a node has finished decommission
+ */
+ private final long recheckInterval;
+ /** The number of decommission nodes to check for each interval */
+ private final int numNodesPerCheck;
+ /** firstkey can be initialized to anything. */
+ private String firstkey = "";
+
+ Monitor(int recheckIntervalInSecond, int numNodesPerCheck) {
+ this.recheckInterval = recheckIntervalInSecond * 1000L;
+ this.numNodesPerCheck = numNodesPerCheck;
+ }
+
+ /**
+ * Check decommission status of numNodesPerCheck nodes
+ * for every recheckInterval milliseconds.
+ */
+ public void run() {
+ for(; fsnamesystem.isRunning(); ) {
+ synchronized(fsnamesystem) {
+ check();
+ }
+
+ try {
+ Thread.sleep(recheckInterval);
+ } catch (InterruptedException ie) {
+ LOG.info("Interrupted " + this.getClass().getSimpleName(), ie);
+ }
+ }
+ }
+
+ private void check() {
+ int count = 0;
+ for(Map.Entry<String, DatanodeDescriptor> entry
+ : new CyclicIteration<String, DatanodeDescriptor>(
+ fsnamesystem.datanodeMap, firstkey)) {
+ final DatanodeDescriptor d = entry.getValue();
+ firstkey = entry.getKey();
+
+ if (d.isDecommissionInProgress()) {
+ try {
+ fsnamesystem.checkDecommissionStateInternal(d);
+ } catch(Exception e) {
+ LOG.warn("entry=" + entry, e);
+ }
+ if (++count == numNodesPerCheck) {
+ return;
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java?rev=720555&r1=720554&r2=720555&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java Tue Nov 25 10:05:11 2008
@@ -151,7 +151,7 @@
* <p>
* Mapping: StorageID -> DatanodeDescriptor
*/
- Map<String, DatanodeDescriptor> datanodeMap =
+ NavigableMap<String, DatanodeDescriptor> datanodeMap =
new TreeMap<String, DatanodeDescriptor>();
//
@@ -211,7 +211,7 @@
Daemon replthread = null; // Replication thread
Daemon resthread = null; //ResolutionMonitor thread
- volatile boolean fsRunning = true;
+ private volatile boolean fsRunning = true;
long systemStart = 0;
// The maximum number of replicates we should allow for a single block
@@ -229,8 +229,6 @@
private long heartbeatExpireInterval;
//replicationRecheckInterval is how often namenode checks for new replication work
private long replicationRecheckInterval;
- //decommissionRecheckInterval is how often namenode checks if a node has finished decommission
- private long decommissionRecheckInterval;
// default block size of a file
private long defaultBlockSize = 0;
@@ -314,7 +312,9 @@
this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
conf.get("dfs.hosts.exclude",""));
- this.dnthread = new Daemon(new DecommissionedMonitor());
+ this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
+ conf.getInt("dfs.namenode.decommission.interval", 30),
+ conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));
dnthread.start();
this.dnsToSwitchMapping = (DNSToSwitchMapping)ReflectionUtils.newInstance(
@@ -431,8 +431,6 @@
10 * heartbeatInterval;
this.replicationRecheckInterval =
conf.getInt("dfs.replication.interval", 3) * 1000L;
- this.decommissionRecheckInterval =
- conf.getInt("dfs.namenode.decommission.interval", 5 * 60) * 1000L;
this.defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
this.maxFsObjects = conf.getLong("dfs.max.objects", 0);
this.blockInvalidateLimit = Math.max(this.blockInvalidateLimit,
@@ -492,6 +490,11 @@
}
}
+ /** Is this name system running? */
+ boolean isRunning() {
+ return fsRunning;
+ }
+
/**
* Dump all metadata into specified file
*/
@@ -3470,9 +3473,8 @@
*/
private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
boolean status = false;
- Iterator<Block> decommissionBlocks = srcNode.getBlockIterator();
- while(decommissionBlocks.hasNext()) {
- Block block = decommissionBlocks.next();
+ for(final Iterator<Block> i = srcNode.getBlockIterator(); i.hasNext(); ) {
+ final Block block = i.next();
INode fileINode = blocksMap.getINode(block);
if (fileINode != null) {
@@ -3504,9 +3506,9 @@
* Change, if appropriate, the admin state of a datanode to
* decommission completed. Return true if decommission is complete.
*/
- private boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
+ boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
//
- // Check to see if all blocks in this decommisioned
+ // Check to see if all blocks in this decommissioned
// node has reached their target replication factor.
//
if (node.isDecommissionInProgress()) {
@@ -3618,39 +3620,6 @@
private boolean shouldNodeShutdown(DatanodeDescriptor node) {
return (node.isDecommissioned());
}
-
- /**
- * Check if any of the nodes being decommissioned has finished
- * moving all its datablocks to another replica. This is a loose
- * heuristic to determine when a decommission is really over.
- */
- public synchronized void decommissionedDatanodeCheck() {
- for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
- it.hasNext();) {
- DatanodeDescriptor node = it.next();
- checkDecommissionStateInternal(node);
- }
- }
-
- /**
- * Periodically calls decommissionedDatanodeCheck().
- */
- class DecommissionedMonitor implements Runnable {
-
- public void run() {
- while (fsRunning) {
- try {
- decommissionedDatanodeCheck();
- } catch (Exception e) {
- FSNamesystem.LOG.info(StringUtils.stringifyException(e));
- }
- try {
- Thread.sleep(decommissionRecheckInterval);
- } catch (InterruptedException ie) {
- }
- }
- }
- }
/**
* Get data node by storage ID.
Modified: hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/LeaseManager.java?rev=720555&r1=720554&r2=720555&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/LeaseManager.java (original)
+++ hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/LeaseManager.java Tue Nov 25 10:05:11 2008
@@ -340,7 +340,7 @@
class Monitor implements Runnable {
public void run() {
try {
- while (fsnamesystem.fsRunning) {
+ while (fsnamesystem.isRunning()) {
synchronized (fsnamesystem) {
synchronized (LeaseManager.this) {
Lease top;
Modified: hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestDecommission.java?rev=720555&r1=720554&r2=720555&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestDecommission.java (original)
+++ hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestDecommission.java Tue Nov 25 10:05:11 2008
@@ -17,20 +17,21 @@
*/
package org.apache.hadoop.dfs;
-import junit.framework.TestCase;
-import java.io.*;
-import java.util.Collection;
-import java.util.Random;
+import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
-import java.net.*;
-import java.lang.InterruptedException;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dfs.FSConstants.DatanodeReportType;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.BlockLocation;
/**
* This class tests the decommissioning of nodes.
@@ -198,18 +199,6 @@
}
/*
- * put node back in action
- */
- private void commissionNode(FileSystem filesys, FileSystem localFileSys,
- String node) throws IOException {
- DistributedFileSystem dfs = (DistributedFileSystem) filesys;
-
- System.out.println("Commissioning nodes.");
- writeConfigFile(localFileSys, excludeFile, null);
- dfs.refreshNodes();
- }
-
- /*
* Check if node is in the requested state.
*/
private boolean checkNodeState(FileSystem filesys,
@@ -245,7 +234,6 @@
private void waitNodeState(FileSystem filesys,
String node,
NodeState state) throws IOException {
- DistributedFileSystem dfs = (DistributedFileSystem) filesys;
boolean done = checkNodeState(filesys, node, state);
while (!done) {
System.out.println("Waiting for node " + node +
@@ -287,7 +275,6 @@
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
assertEquals("Number of Datanodes ", numDatanodes, info.length);
FileSystem fileSys = cluster.getFileSystem();
- DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
try {
for (int iteration = 0; iteration < numDatanodes - 1; iteration++) {
Added: hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/util/TestCyclicIteration.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/util/TestCyclicIteration.java?rev=720555&view=auto
==============================================================================
--- hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/util/TestCyclicIteration.java (added)
+++ hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/util/TestCyclicIteration.java Tue Nov 25 10:05:11 2008
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+public class TestCyclicIteration extends junit.framework.TestCase {
+ public void testCyclicIteration() throws Exception {
+ for(int n = 0; n < 5; n++) {
+ checkCyclicIteration(n);
+ }
+ }
+
+ private static void checkCyclicIteration(int numOfElements) {
+ //create a tree map
+ final NavigableMap<Integer, Integer> map = new TreeMap<Integer, Integer>();
+ final Integer[] integers = new Integer[numOfElements];
+ for(int i = 0; i < integers.length; i++) {
+ integers[i] = 2*i;
+ map.put(integers[i], integers[i]);
+ }
+ System.out.println("\n\nintegers=" + Arrays.asList(integers));
+ System.out.println("map=" + map);
+
+ //try starting everywhere
+ for(int start = -1; start <= 2*integers.length - 1; start++) {
+ //get a cyclic iteration
+ final List<Integer> iteration = new ArrayList<Integer>();
+ for(Map.Entry<Integer, Integer> e : new CyclicIteration<Integer, Integer>(map, start)) {
+ iteration.add(e.getKey());
+ }
+ System.out.println("start=" + start + ", iteration=" + iteration);
+
+ //verify results
+ for(int i = 0; i < integers.length; i++) {
+ final int j = ((start+2)/2 + i)%integers.length;
+ assertEquals("i=" + i + ", j=" + j, iteration.get(i), integers[j]);
+ }
+ }
+ }
+}