You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2014/08/07 09:18:48 UTC
svn commit: r1616422 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/balancer/
Author: szetszwo
Date: Thu Aug 7 07:18:48 2014
New Revision: 1616422
URL: http://svn.apache.org/r1616422
Log:
HDFS-6809. Move Balancer's inner classes MovedBlocks and Matcher as to standalone classes and separates KeyManager from NameNodeConnector.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Matcher.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1616422&r1=1616421&r2=1616422&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Aug 7 07:18:48 2014
@@ -360,6 +360,10 @@ Release 2.6.0 - UNRELEASED
HDFS-6787. Remove duplicate code in FSDirectory#unprotectedConcat. (Yi Liu via umamahesh)
+ HDFS-6809. Move Balancer's inner classes MovedBlocks and Matcher as to
+ standalone classes and separates KeyManager from NameNodeConnector.
+ (szetszwo)
+
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1616422&r1=1616421&r2=1616422&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Thu Aug 7 07:18:48 2014
@@ -58,6 +58,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -85,7 +86,6 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.StringUtils;
@@ -195,10 +195,12 @@ import com.google.common.base.Preconditi
@InterfaceAudience.Private
public class Balancer {
static final Log LOG = LogFactory.getLog(Balancer.class);
- final private static long GB = 1L << 30; //1GB
- final private static long MAX_SIZE_TO_MOVE = 10*GB;
- final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB;
- private static long WIN_WIDTH = 5400*1000L; // 1.5 hour
+
+ private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
+
+ private static final long GB = 1L << 30; //1GB
+ private static final long MAX_SIZE_TO_MOVE = 10*GB;
+ private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB;
/** The maximum number of concurrent blocks moves for
* balancing purpose at a datanode
@@ -219,6 +221,8 @@ public class Balancer {
+ "\tIncludes only the specified datanodes.";
private final NameNodeConnector nnc;
+ private final KeyManager keyManager;
+
private final BalancingPolicy policy;
private final SaslDataTransferClient saslClient;
private final double threshold;
@@ -241,7 +245,8 @@ public class Balancer {
private final Map<Block, BalancerBlock> globalBlockList
= new HashMap<Block, BalancerBlock>();
- private final MovedBlocks movedBlocks = new MovedBlocks();
+ private final MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks;
+
/** Map (datanodeUuid,storageType -> StorageGroup) */
private final StorageGroupMap storageGroupMap = new StorageGroupMap();
@@ -326,7 +331,7 @@ public class Balancer {
if (isGoodBlockCandidate(source, target, block)) {
this.block = block;
if ( chooseProxySource() ) {
- movedBlocks.add(block);
+ movedBlocks.put(block);
if (LOG.isDebugEnabled()) {
LOG.debug("Decided to move " + this);
}
@@ -399,10 +404,10 @@ public class Balancer {
OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream();
- ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
- Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
+ ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), block.getBlock());
+ Token<BlockTokenIdentifier> accessToken = keyManager.getAccessToken(eb);
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
- unbufIn, nnc, accessToken, target.getDatanode());
+ unbufIn, keyManager, accessToken, target.getDatanode());
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
@@ -483,47 +488,9 @@ public class Balancer {
}
/* A class for keeping track of blocks in the Balancer */
- static private class BalancerBlock {
- private final Block block; // the block
- /** The locations of the replicas of the block. */
- private final List<BalancerDatanode.StorageGroup> locations
- = new ArrayList<BalancerDatanode.StorageGroup>(3);
-
- /* Constructor */
- private BalancerBlock(Block block) {
- this.block = block;
- }
-
- /* clean block locations */
- private synchronized void clearLocations() {
- locations.clear();
- }
-
- /* add a location */
- private synchronized void addLocation(BalancerDatanode.StorageGroup g) {
- if (!locations.contains(g)) {
- locations.add(g);
- }
- }
-
- /** @return if the block is located on the given storage group. */
- private synchronized boolean isLocatedOn(BalancerDatanode.StorageGroup g) {
- return locations.contains(g);
- }
-
- /* Return its locations */
- private synchronized List<BalancerDatanode.StorageGroup> getLocations() {
- return locations;
- }
-
- /* Return the block */
- private Block getBlock() {
- return block;
- }
-
- /* Return the length of the block */
- private long getNumBytes() {
- return block.getNumBytes();
+ static class BalancerBlock extends MovedBlocks.Locations<BalancerDatanode.StorageGroup> {
+ BalancerBlock(Block block) {
+ super(block);
}
}
@@ -735,7 +702,7 @@ public class Balancer {
*/
private long getBlockList() throws IOException {
final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
- final BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(
+ final BlockWithLocations[] newBlocks = nnc.getNamenode().getBlocks(
getDatanode(), size).getBlocks();
long bytesReceived = 0;
@@ -819,7 +786,7 @@ public class Balancer {
private void filterMovedBlocks() {
for (Iterator<BalancerBlock> blocks=getBlockIterator();
blocks.hasNext();) {
- if (movedBlocks.contains(blocks.next())) {
+ if (movedBlocks.contains(blocks.next().getBlock())) {
blocks.remove();
}
}
@@ -925,6 +892,13 @@ public class Balancer {
this.nodesToBeExcluded = p.nodesToBeExcluded;
this.nodesToBeIncluded = p.nodesToBeIncluded;
this.nnc = theblockpool;
+ this.keyManager = nnc.getKeyManager();
+
+ final long movedWinWidth = conf.getLong(
+ DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
+ DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
+ movedBlocks = new MovedBlocks<BalancerDatanode.StorageGroup>(movedWinWidth);
+
cluster = NetworkTopology.getInstance(conf);
this.moverExecutor = Executors.newFixedThreadPool(
@@ -1094,36 +1068,6 @@ public class Balancer {
LOG.info(items.size() + " " + name + ": " + items);
}
- /** A matcher interface for matching nodes. */
- private interface Matcher {
- /** Given the cluster topology, does the left node match the right node? */
- boolean match(NetworkTopology cluster, Node left, Node right);
- }
-
- /** Match datanodes in the same node group. */
- static final Matcher SAME_NODE_GROUP = new Matcher() {
- @Override
- public boolean match(NetworkTopology cluster, Node left, Node right) {
- return cluster.isOnSameNodeGroup(left, right);
- }
- };
-
- /** Match datanodes in the same rack. */
- static final Matcher SAME_RACK = new Matcher() {
- @Override
- public boolean match(NetworkTopology cluster, Node left, Node right) {
- return cluster.isOnSameRack(left, right);
- }
- };
-
- /** Match any datanode with any other datanode. */
- static final Matcher ANY_OTHER = new Matcher() {
- @Override
- public boolean match(NetworkTopology cluster, Node left, Node right) {
- return left != right;
- }
- };
-
/**
* Decide all <source, target> pairs and
* the number of bytes to move from a source to a target
@@ -1134,13 +1078,13 @@ public class Balancer {
private long chooseStorageGroups() {
// First, match nodes on the same node group if cluster is node group aware
if (cluster.isNodeGroupAware()) {
- chooseStorageGroups(SAME_NODE_GROUP);
+ chooseStorageGroups(Matcher.SAME_NODE_GROUP);
}
// Then, match nodes on the same rack
- chooseStorageGroups(SAME_RACK);
+ chooseStorageGroups(Matcher.SAME_RACK);
// At last, match all remaining nodes
- chooseStorageGroups(ANY_OTHER);
+ chooseStorageGroups(Matcher.ANY_OTHER);
Preconditions.checkState(storageGroupMap.size() >= sources.size() + targets.size(),
"Mismatched number of datanodes (" + storageGroupMap.size() + " < "
@@ -1307,56 +1251,6 @@ public class Balancer {
} while (shouldWait);
}
- /** This window makes sure to keep blocks that have been moved within 1.5 hour.
- * Old window has blocks that are older;
- * Current window has blocks that are more recent;
- * Cleanup method triggers the check if blocks in the old window are
- * more than 1.5 hour old. If yes, purge the old window and then
- * move blocks in current window to old window.
- */
- private static class MovedBlocks {
- private long lastCleanupTime = Time.now();
- final private static int CUR_WIN = 0;
- final private static int OLD_WIN = 1;
- final private static int NUM_WINS = 2;
- final private List<HashMap<Block, BalancerBlock>> movedBlocks =
- new ArrayList<HashMap<Block, BalancerBlock>>(NUM_WINS);
-
- /* initialize the moved blocks collection */
- private MovedBlocks() {
- movedBlocks.add(new HashMap<Block,BalancerBlock>());
- movedBlocks.add(new HashMap<Block,BalancerBlock>());
- }
-
- /* add a block thus marking a block to be moved */
- synchronized private void add(BalancerBlock block) {
- movedBlocks.get(CUR_WIN).put(block.getBlock(), block);
- }
-
- /* check if a block is marked as moved */
- synchronized private boolean contains(BalancerBlock block) {
- return contains(block.getBlock());
- }
-
- /* check if a block is marked as moved */
- synchronized private boolean contains(Block block) {
- return movedBlocks.get(CUR_WIN).containsKey(block) ||
- movedBlocks.get(OLD_WIN).containsKey(block);
- }
-
- /* remove old blocks */
- synchronized private void cleanup() {
- long curTime = Time.now();
- // check if old win is older than winWidth
- if (lastCleanupTime + WIN_WIDTH <= curTime) {
- // purge the old window
- movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN));
- movedBlocks.set(CUR_WIN, new HashMap<Block, BalancerBlock>());
- lastCleanupTime = curTime;
- }
- }
- }
-
/* Decide if it is OK to move the given block from source to target
* A block is a good candidate if
* 1. the block is not in the process of being moved/has not been moved;
@@ -1369,7 +1263,7 @@ public class Balancer {
return false;
}
// check if the block is moved or not
- if (movedBlocks.contains(block)) {
+ if (movedBlocks.contains(block.getBlock())) {
return false;
}
if (block.isLocatedOn(target)) {
@@ -1387,7 +1281,7 @@ public class Balancer {
} else {
boolean notOnSameRack = true;
synchronized (block) {
- for (BalancerDatanode.StorageGroup loc : block.locations) {
+ for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
notOnSameRack = false;
break;
@@ -1399,7 +1293,7 @@ public class Balancer {
goodBlock = true;
} else {
// good if source is on the same rack as on of the replicas
- for (BalancerDatanode.StorageGroup loc : block.locations) {
+ for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
if (loc != source &&
cluster.isOnSameRack(loc.getDatanode(), source.getDatanode())) {
goodBlock = true;
@@ -1425,7 +1319,7 @@ public class Balancer {
private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode.StorageGroup target,
BalancerBlock block, Source source) {
final DatanodeInfo targetDn = target.getDatanode();
- for (BalancerDatanode.StorageGroup loc : block.locations) {
+ for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
if (loc != source &&
cluster.isOnSameNodeGroup(loc.getDatanode(), targetDn)) {
return true;
@@ -1489,7 +1383,7 @@ public class Balancer {
* decide the number of bytes need to be moved
*/
final long bytesLeftToMove = init(
- nnc.client.getDatanodeStorageReport(DatanodeReportType.LIVE));
+ nnc.getClient().getDatanodeStorageReport(DatanodeReportType.LIVE));
if (bytesLeftToMove == 0) {
System.out.println("The cluster is balanced. Exiting...");
return ReturnStatus.SUCCESS;
@@ -1558,8 +1452,8 @@ public class Balancer {
final long sleeptime = 2000*conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
- LOG.info("namenodes = " + namenodes);
- LOG.info("p = " + p);
+ LOG.info("namenodes = " + namenodes);
+ LOG.info("parameters = " + p);
final Formatter formatter = new Formatter(System.out);
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
@@ -1568,7 +1462,10 @@ public class Balancer {
= new ArrayList<NameNodeConnector>(namenodes.size());
try {
for (URI uri : namenodes) {
- connectors.add(new NameNodeConnector(uri, conf));
+ final NameNodeConnector nnc = new NameNodeConnector(
+ Balancer.class.getSimpleName(), uri, BALANCER_ID_PATH, conf);
+ nnc.getKeyManager().startBlockKeyUpdater();
+ connectors.add(nnc);
}
boolean done = false;
@@ -1730,9 +1627,6 @@ public class Balancer {
public int run(String[] args) {
final long startTime = Time.now();
final Configuration conf = getConf();
- WIN_WIDTH = conf.getLong(
- DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
- DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
try {
checkReplicationPolicyCompatibility(conf);
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java?rev=1616422&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java Thu Aug 7 07:18:48 2014
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.EnumSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * The class provides utilities for key and token management.
+ */
+@InterfaceAudience.Private
+public class KeyManager implements Closeable, DataEncryptionKeyFactory {
+ private static final Log LOG = LogFactory.getLog(KeyManager.class);
+
+ private final NamenodeProtocol namenode;
+
+ private final boolean isBlockTokenEnabled;
+ private final boolean encryptDataTransfer;
+ private boolean shouldRun;
+
+ private final BlockTokenSecretManager blockTokenSecretManager;
+ private final BlockKeyUpdater blockKeyUpdater;
+ private DataEncryptionKey encryptionKey;
+
+ public KeyManager(String blockpoolID, NamenodeProtocol namenode,
+ boolean encryptDataTransfer, Configuration conf) throws IOException {
+ this.namenode = namenode;
+ this.encryptDataTransfer = encryptDataTransfer;
+
+ final ExportedBlockKeys keys = namenode.getBlockKeys();
+ this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
+ if (isBlockTokenEnabled) {
+ long updateInterval = keys.getKeyUpdateInterval();
+ long tokenLifetime = keys.getTokenLifetime();
+ LOG.info("Block token params received from NN: update interval="
+ + StringUtils.formatTime(updateInterval)
+ + ", token lifetime=" + StringUtils.formatTime(tokenLifetime));
+ String encryptionAlgorithm = conf.get(
+ DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
+ this.blockTokenSecretManager = new BlockTokenSecretManager(
+ updateInterval, tokenLifetime, blockpoolID, encryptionAlgorithm);
+ this.blockTokenSecretManager.addKeys(keys);
+
+ // sync block keys with NN more frequently than NN updates its block keys
+ this.blockKeyUpdater = new BlockKeyUpdater(updateInterval / 4);
+ this.shouldRun = true;
+ } else {
+ this.blockTokenSecretManager = null;
+ this.blockKeyUpdater = null;
+ }
+ }
+
+ public void startBlockKeyUpdater() {
+ if (blockKeyUpdater != null) {
+ blockKeyUpdater.daemon.start();
+ }
+ }
+
+ /** Get an access token for a block. */
+ public Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb
+ ) throws IOException {
+ if (!isBlockTokenEnabled) {
+ return BlockTokenSecretManager.DUMMY_TOKEN;
+ } else {
+ if (!shouldRun) {
+ throw new IOException(
+ "Cannot get access token since BlockKeyUpdater is not running");
+ }
+ return blockTokenSecretManager.generateToken(null, eb,
+ EnumSet.of(AccessMode.REPLACE, AccessMode.COPY));
+ }
+ }
+
+ @Override
+ public DataEncryptionKey newDataEncryptionKey() {
+ if (encryptDataTransfer) {
+ synchronized (this) {
+ if (encryptionKey == null) {
+ encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
+ }
+ return encryptionKey;
+ }
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void close() {
+ shouldRun = false;
+ try {
+ if (blockKeyUpdater != null) {
+ blockKeyUpdater.daemon.interrupt();
+ }
+ } catch(Exception e) {
+ LOG.warn("Exception shutting down access key updater thread", e);
+ }
+ }
+
+ /**
+ * Periodically updates access keys.
+ */
+ class BlockKeyUpdater implements Runnable, Closeable {
+ private final Daemon daemon = new Daemon(this);
+ private final long sleepInterval;
+
+ BlockKeyUpdater(final long sleepInterval) {
+ this.sleepInterval = sleepInterval;
+ LOG.info("Update block keys every " + StringUtils.formatTime(sleepInterval));
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (shouldRun) {
+ try {
+ blockTokenSecretManager.addKeys(namenode.getBlockKeys());
+ } catch (IOException e) {
+ LOG.error("Failed to set keys", e);
+ }
+ Thread.sleep(sleepInterval);
+ }
+ } catch (InterruptedException e) {
+ LOG.debug("InterruptedException in block key updater thread", e);
+ } catch (Throwable e) {
+ LOG.error("Exception in block key updater thread", e);
+ shouldRun = false;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ daemon.interrupt();
+ } catch(Exception e) {
+ LOG.warn("Exception shutting down key updater thread", e);
+ }
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Matcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Matcher.java?rev=1616422&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Matcher.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Matcher.java Thu Aug 7 07:18:48 2014
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+
+/** A matcher interface for matching nodes. */
+public interface Matcher {
+ /** Given the cluster topology, does the left node match the right node? */
+ public boolean match(NetworkTopology cluster, Node left, Node right);
+
+ /** Match datanodes in the same node group. */
+ public static final Matcher SAME_NODE_GROUP = new Matcher() {
+ @Override
+ public boolean match(NetworkTopology cluster, Node left, Node right) {
+ return cluster.isOnSameNodeGroup(left, right);
+ }
+ };
+
+ /** Match datanodes in the same rack. */
+ public static final Matcher SAME_RACK = new Matcher() {
+ @Override
+ public boolean match(NetworkTopology cluster, Node left, Node right) {
+ return cluster.isOnSameRack(left, right);
+ }
+ };
+
+ /** Match any datanode with any other datanode. */
+ public static final Matcher ANY_OTHER = new Matcher() {
+ @Override
+ public boolean match(NetworkTopology cluster, Node left, Node right) {
+ return left != right;
+ }
+ };
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java?rev=1616422&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java Thu Aug 7 07:18:48 2014
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.util.Time;
+
+/**
+ * This window makes sure to keep blocks that have been moved within a fixed
+ * time interval (default is 1.5 hour). Old window has blocks that are older;
+ * Current window has blocks that are more recent; Cleanup method triggers the
+ * check if blocks in the old window are more than the fixed time interval. If
+ * yes, purge the old window and then move blocks in current window to old
+ * window.
+ *
+ * @param <L> Location type
+ */
+public class MovedBlocks<L> {
+ /** A class for keeping track of a block and its locations */
+ public static class Locations<L> {
+ private final Block block; // the block
+ /** The locations of the replicas of the block. */
+ private final List<L> locations = new ArrayList<L>(3);
+
+ public Locations(Block block) {
+ this.block = block;
+ }
+
+ /** clean block locations */
+ public synchronized void clearLocations() {
+ locations.clear();
+ }
+
+ /** add a location */
+ public synchronized void addLocation(L loc) {
+ if (!locations.contains(loc)) {
+ locations.add(loc);
+ }
+ }
+
+ /** @return if the block is located on the given location. */
+ public synchronized boolean isLocatedOn(L loc) {
+ return locations.contains(loc);
+ }
+
+ /** @return its locations */
+ public synchronized List<L> getLocations() {
+ return locations;
+ }
+
+ /* @return the block */
+ public Block getBlock() {
+ return block;
+ }
+
+ /* Return the length of the block */
+ public long getNumBytes() {
+ return block.getNumBytes();
+ }
+ }
+
+ private static final int CUR_WIN = 0;
+ private static final int OLD_WIN = 1;
+ private static final int NUM_WINS = 2;
+
+ private final long winTimeInterval;
+ private long lastCleanupTime = Time.monotonicNow();
+ private final List<Map<Block, Locations<L>>> movedBlocks
+ = new ArrayList<Map<Block, Locations<L>>>(NUM_WINS);
+
+ /** initialize the moved blocks collection */
+ public MovedBlocks(long winTimeInterval) {
+ this.winTimeInterval = winTimeInterval;
+ movedBlocks.add(newMap());
+ movedBlocks.add(newMap());
+ }
+
+ private Map<Block, Locations<L>> newMap() {
+ return new HashMap<Block, Locations<L>>();
+ }
+
+ /** add a block thus marking a block to be moved */
+ public synchronized void put(Locations<L> block) {
+ movedBlocks.get(CUR_WIN).put(block.getBlock(), block);
+ }
+
+ /** @return if a block is marked as moved */
+ public synchronized boolean contains(Block block) {
+ return movedBlocks.get(CUR_WIN).containsKey(block) ||
+ movedBlocks.get(OLD_WIN).containsKey(block);
+ }
+
+ /** remove old blocks */
+ public synchronized void cleanup() {
+ long curTime = Time.monotonicNow();
+ // check if old win is older than winWidth
+ if (lastCleanupTime + winTimeInterval <= curTime) {
+ // purge the old window
+ movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN));
+ movedBlocks.set(CUR_WIN, newMap());
+ lastCleanupTime = curTime;
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1616422&r1=1616421&r2=1616422&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Thu Aug 7 07:18:48 2014
@@ -17,113 +17,96 @@
*/
package org.apache.hadoop.hdfs.server.balancer;
+import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI;
-import java.util.EnumSet;
import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
-import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Daemon;
/**
- * The class provides utilities for {@link Balancer} to access a NameNode
+ * The class provides utilities for accessing a NameNode.
*/
@InterfaceAudience.Private
-class NameNodeConnector implements DataEncryptionKeyFactory {
- private static final Log LOG = Balancer.LOG;
- private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
+public class NameNodeConnector implements Closeable {
+ private static final Log LOG = LogFactory.getLog(NameNodeConnector.class);
+
private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
- final URI nameNodeUri;
- final String blockpoolID;
+ private final URI nameNodeUri;
+ private final String blockpoolID;
+
+ private final NamenodeProtocol namenode;
+ private final ClientProtocol client;
+ private final KeyManager keyManager;
+
+ private final FileSystem fs;
+ private final Path idPath;
+ private final OutputStream out;
- final NamenodeProtocol namenode;
- final ClientProtocol client;
- final FileSystem fs;
- final OutputStream out;
-
- private final boolean isBlockTokenEnabled;
- private final boolean encryptDataTransfer;
- private boolean shouldRun;
- private long keyUpdaterInterval;
- // used for balancer
private int notChangedIterations = 0;
- private BlockTokenSecretManager blockTokenSecretManager;
- private Daemon keyupdaterthread; // AccessKeyUpdater thread
- private DataEncryptionKey encryptionKey;
- NameNodeConnector(URI nameNodeUri,
+ public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
Configuration conf) throws IOException {
this.nameNodeUri = nameNodeUri;
+ this.idPath = idPath;
- this.namenode =
- NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class)
- .getProxy();
- this.client =
- NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class)
- .getProxy();
+ this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
+ NamenodeProtocol.class).getProxy();
+ this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
+ ClientProtocol.class).getProxy();
this.fs = FileSystem.get(nameNodeUri, conf);
final NamespaceInfo namespaceinfo = namenode.versionRequest();
this.blockpoolID = namespaceinfo.getBlockPoolID();
- final ExportedBlockKeys keys = namenode.getBlockKeys();
- this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
- if (isBlockTokenEnabled) {
- long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
- long blockTokenLifetime = keys.getTokenLifetime();
- LOG.info("Block token params received from NN: keyUpdateInterval="
- + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
- + blockTokenLifetime / (60 * 1000) + " min(s)");
- String encryptionAlgorithm = conf.get(
- DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
- this.blockTokenSecretManager = new BlockTokenSecretManager(
- blockKeyUpdateInterval, blockTokenLifetime, blockpoolID,
- encryptionAlgorithm);
- this.blockTokenSecretManager.addKeys(keys);
- /*
- * Balancer should sync its block keys with NN more frequently than NN
- * updates its block keys
- */
- this.keyUpdaterInterval = blockKeyUpdateInterval / 4;
- LOG.info("Balancer will update its block keys every "
- + keyUpdaterInterval / (60 * 1000) + " minute(s)");
- this.keyupdaterthread = new Daemon(new BlockKeyUpdater());
- this.shouldRun = true;
- this.keyupdaterthread.start();
- }
- this.encryptDataTransfer = fs.getServerDefaults(new Path("/"))
- .getEncryptDataTransfer();
- // Check if there is another balancer running.
+ final FsServerDefaults defaults = fs.getServerDefaults(new Path("/"));
+ this.keyManager = new KeyManager(blockpoolID, namenode,
+ defaults.getEncryptDataTransfer(), conf);
// Exit if there is another one running.
- out = checkAndMarkRunningBalancer();
+ out = checkAndMarkRunning();
if (out == null) {
- throw new IOException("Another balancer is running");
+ throw new IOException("Another " + name + " is running.");
}
}
- boolean shouldContinue(long dispatchBlockMoveBytes) {
+ /** @return the block pool ID */
+ public String getBlockpoolID() {
+ return blockpoolID;
+ }
+
+ /** @return the namenode proxy. */
+ public NamenodeProtocol getNamenode() {
+ return namenode;
+ }
+
+ /** @return the client proxy. */
+ public ClientProtocol getClient() {
+ return client;
+ }
+
+ /** @return the key manager */
+ public KeyManager getKeyManager() {
+ return keyManager;
+ }
+
+ /** Should the instance continue running? */
+ public boolean shouldContinue(long dispatchBlockMoveBytes) {
if (dispatchBlockMoveBytes > 0) {
notChangedIterations = 0;
} else {
@@ -137,53 +120,25 @@ class NameNodeConnector implements DataE
return true;
}
- /** Get an access token for a block. */
- Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb
- ) throws IOException {
- if (!isBlockTokenEnabled) {
- return BlockTokenSecretManager.DUMMY_TOKEN;
- } else {
- if (!shouldRun) {
- throw new IOException(
- "Can not get access token. BlockKeyUpdater is not running");
- }
- return blockTokenSecretManager.generateToken(null, eb,
- EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
- BlockTokenSecretManager.AccessMode.COPY));
- }
- }
-
- @Override
- public DataEncryptionKey newDataEncryptionKey() {
- if (encryptDataTransfer) {
- synchronized (this) {
- if (encryptionKey == null) {
- encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
- }
- return encryptionKey;
- }
- } else {
- return null;
- }
- }
- /* The idea for making sure that there is no more than one balancer
+ /**
+ * The idea for making sure that there is no more than one instance
* running in an HDFS is to create a file in the HDFS, writes the hostname
- * of the machine on which the balancer is running to the file, but did not
- * close the file until the balancer exits.
- * This prevents the second balancer from running because it can not
+ * of the machine on which the instance is running to the file, but did not
+ * close the file until it exits.
+ *
+ * This prevents the second instance from running because it can not
* creates the file while the first one is running.
*
- * This method checks if there is any running balancer and
- * if no, mark yes if no.
+ * This method checks if there is any running instance. If no, mark yes.
* Note that this is an atomic operation.
*
- * Return null if there is a running balancer; otherwise the output stream
- * to the newly created file.
+ * @return null if there is a running instance;
+ * otherwise, the output stream to the newly created file.
*/
- private OutputStream checkAndMarkRunningBalancer() throws IOException {
+ private OutputStream checkAndMarkRunning() throws IOException {
try {
- final DataOutputStream out = fs.create(BALANCER_ID_PATH);
+ final DataOutputStream out = fs.create(idPath);
out.writeBytes(InetAddress.getLocalHost().getHostName());
out.flush();
return out;
@@ -196,24 +151,17 @@ class NameNodeConnector implements DataE
}
}
- /** Close the connection. */
- void close() {
- shouldRun = false;
- try {
- if (keyupdaterthread != null) {
- keyupdaterthread.interrupt();
- }
- } catch(Exception e) {
- LOG.warn("Exception shutting down access key updater thread", e);
- }
+ @Override
+ public void close() {
+ keyManager.close();
// close the output file
IOUtils.closeStream(out);
if (fs != null) {
try {
- fs.delete(BALANCER_ID_PATH, true);
+ fs.delete(idPath, true);
} catch(IOException ioe) {
- LOG.warn("Failed to delete " + BALANCER_ID_PATH, ioe);
+ LOG.warn("Failed to delete " + idPath, ioe);
}
}
}
@@ -221,31 +169,6 @@ class NameNodeConnector implements DataE
@Override
public String toString() {
return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri
- + ", id=" + blockpoolID
- + "]";
- }
-
- /**
- * Periodically updates access keys.
- */
- class BlockKeyUpdater implements Runnable {
- @Override
- public void run() {
- try {
- while (shouldRun) {
- try {
- blockTokenSecretManager.addKeys(namenode.getBlockKeys());
- } catch (IOException e) {
- LOG.error("Failed to set keys", e);
- }
- Thread.sleep(keyUpdaterInterval);
- }
- } catch (InterruptedException e) {
- LOG.debug("InterruptedException in block key updater thread", e);
- } catch (Throwable e) {
- LOG.error("Exception in block key updater thread", e);
- shouldRun = false;
- }
- }
+ + ", bpid=" + blockpoolID + "]";
}
}