You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2014/07/23 03:47:36 UTC
svn commit: r1612742 [2/4] - in
/hadoop/common/branches/MR-2841/hadoop-hdfs-project:
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/
hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/
hadoop-hdfs/src/contrib/bkjo...
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Wed Jul 23 01:47:28 2014
@@ -70,7 +70,8 @@ public class BlockPlacementPolicyWithNod
protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
- StorageType storageType) throws NotEnoughReplicasException {
+ StorageType storageType, boolean fallbackToLocalRack
+ ) throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes,
@@ -97,6 +98,10 @@ public class BlockPlacementPolicyWithNod
if (chosenStorage != null) {
return chosenStorage;
}
+
+ if (!fallbackToLocalRack) {
+ return null;
+ }
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
@@ -286,9 +291,9 @@ public class BlockPlacementPolicyWithNod
* If first is empty, then pick second.
*/
@Override
- public Collection<DatanodeDescriptor> pickupReplicaSet(
- Collection<DatanodeDescriptor> first,
- Collection<DatanodeDescriptor> second) {
+ public Collection<DatanodeStorageInfo> pickupReplicaSet(
+ Collection<DatanodeStorageInfo> first,
+ Collection<DatanodeStorageInfo> second) {
// If no replica within same rack, return directly.
if (first.isEmpty()) {
return second;
@@ -296,25 +301,24 @@ public class BlockPlacementPolicyWithNod
// Split data nodes in the first set into two sets,
// moreThanOne contains nodes on nodegroup with more than one replica
// exactlyOne contains the remaining nodes
- Map<String, List<DatanodeDescriptor>> nodeGroupMap =
- new HashMap<String, List<DatanodeDescriptor>>();
+ Map<String, List<DatanodeStorageInfo>> nodeGroupMap =
+ new HashMap<String, List<DatanodeStorageInfo>>();
- for(DatanodeDescriptor node : first) {
- final String nodeGroupName =
- NetworkTopology.getLastHalf(node.getNetworkLocation());
- List<DatanodeDescriptor> datanodeList =
- nodeGroupMap.get(nodeGroupName);
- if (datanodeList == null) {
- datanodeList = new ArrayList<DatanodeDescriptor>();
- nodeGroupMap.put(nodeGroupName, datanodeList);
+ for(DatanodeStorageInfo storage : first) {
+ final String nodeGroupName = NetworkTopology.getLastHalf(
+ storage.getDatanodeDescriptor().getNetworkLocation());
+ List<DatanodeStorageInfo> storageList = nodeGroupMap.get(nodeGroupName);
+ if (storageList == null) {
+ storageList = new ArrayList<DatanodeStorageInfo>();
+ nodeGroupMap.put(nodeGroupName, storageList);
}
- datanodeList.add(node);
+ storageList.add(storage);
}
- final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
- final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
+ final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
+ final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
// split nodes into two sets
- for(List<DatanodeDescriptor> datanodeList : nodeGroupMap.values()) {
+ for(List<DatanodeStorageInfo> datanodeList : nodeGroupMap.values()) {
if (datanodeList.size() == 1 ) {
// exactlyOne contains nodes on nodegroup with exactly one replica
exactlyOne.add(datanodeList.get(0));
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Wed Jul 23 01:47:28 2014
@@ -345,7 +345,8 @@ public class DatanodeManager {
/** Sort the located blocks by the distance to the target host. */
public void sortLocatedBlocks(final String targethost,
- final List<LocatedBlock> locatedblocks) {
+ final List<LocatedBlock> locatedblocks,
+ boolean randomizeBlockLocationsPerBlock) {
//sort the blocks
// As it is possible for the separation of node manager and datanode,
// here we should get node but not datanode only .
@@ -372,8 +373,8 @@ public class DatanodeManager {
--lastActiveIndex;
}
int activeLen = lastActiveIndex + 1;
- networktopology.sortByDistance(client, b.getLocations(), activeLen,
- b.getBlock().getBlockId());
+ networktopology.sortByDistance(client, b.getLocations(), activeLen, b
+ .getBlock().getBlockId(), randomizeBlockLocationsPerBlock);
}
}
@@ -820,7 +821,9 @@ public class DatanodeManager {
}
/** Start decommissioning the specified datanode. */
- private void startDecommission(DatanodeDescriptor node) {
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ public void startDecommission(DatanodeDescriptor node) {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
LOG.info("Start Decommissioning " + node + " " + storage
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java Wed Jul 23 01:47:28 2014
@@ -52,6 +52,12 @@ public interface DatanodeStatistics {
/** @return the xceiver count */
public int getXceiverCount();
+ /** @return average xceiver count for non-decommission(ing|ed) nodes */
+ public int getInServiceXceiverCount();
+
+ /** @return number of non-decommission(ing|ed) nodes */
+ public int getNumDatanodesInService();
+
/**
* @return the total used space by data nodes for non-DFS purposes
* such as storing temporary files on the local file system
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java Wed Jul 23 01:47:28 2014
@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -290,4 +291,21 @@ public class DatanodeStorageInfo {
public String toString() {
return "[" + storageType + "]" + storageID + ":" + state;
}
+
+ /** @return the first {@link DatanodeStorageInfo} corresponding to
+ * the given datanode
+ */
+ static DatanodeStorageInfo getDatanodeStorageInfo(
+ final Iterable<DatanodeStorageInfo> infos,
+ final DatanodeDescriptor datanode) {
+ if (datanode == null) {
+ return null;
+ }
+ for(DatanodeStorageInfo storage : infos) {
+ if (storage.getDatanodeDescriptor() == datanode) {
+ return storage;
+ }
+ }
+ return null;
+ }
}
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Wed Jul 23 01:47:28 2014
@@ -151,6 +151,16 @@ class HeartbeatManager implements Datano
}
@Override
+ public synchronized int getInServiceXceiverCount() {
+ return stats.nodesInServiceXceiverCount;
+ }
+
+ @Override
+ public synchronized int getNumDatanodesInService() {
+ return stats.nodesInService;
+ }
+
+ @Override
public synchronized long getCacheCapacity() {
return stats.cacheCapacity;
}
@@ -178,7 +188,7 @@ class HeartbeatManager implements Datano
}
synchronized void register(final DatanodeDescriptor d) {
- if (!datanodes.contains(d)) {
+ if (!d.isAlive) {
addDatanode(d);
//update its timestamp
@@ -191,6 +201,8 @@ class HeartbeatManager implements Datano
}
synchronized void addDatanode(final DatanodeDescriptor d) {
+ // update in-service node count
+ stats.add(d);
datanodes.add(d);
d.isAlive = true;
}
@@ -323,6 +335,9 @@ class HeartbeatManager implements Datano
private long cacheCapacity = 0L;
private long cacheUsed = 0L;
+ private int nodesInService = 0;
+ private int nodesInServiceXceiverCount = 0;
+
private int expiredHeartbeats = 0;
private void add(final DatanodeDescriptor node) {
@@ -330,6 +345,8 @@ class HeartbeatManager implements Datano
blockPoolUsed += node.getBlockPoolUsed();
xceiverCount += node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ nodesInService++;
+ nodesInServiceXceiverCount += node.getXceiverCount();
capacityTotal += node.getCapacity();
capacityRemaining += node.getRemaining();
} else {
@@ -344,6 +361,8 @@ class HeartbeatManager implements Datano
blockPoolUsed -= node.getBlockPoolUsed();
xceiverCount -= node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ nodesInService--;
+ nodesInServiceXceiverCount -= node.getXceiverCount();
capacityTotal -= node.getCapacity();
capacityRemaining -= node.getRemaining();
} else {
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java Wed Jul 23 01:47:28 2014
@@ -93,7 +93,8 @@ public final class HdfsServerConstants {
FORCE("-force"),
NONINTERACTIVE("-nonInteractive"),
RENAMERESERVED("-renameReserved"),
- METADATAVERSION("-metadataVersion");
+ METADATAVERSION("-metadataVersion"),
+ UPGRADEONLY("-upgradeOnly");
private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = Pattern.compile(
"(\\w+)\\((\\w+)\\)");
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Wed Jul 23 01:47:28 2014
@@ -575,7 +575,8 @@ class BPOfferService {
switch(cmd.getAction()) {
case DatanodeProtocol.DNA_TRANSFER:
// Send a copy of a block to another datanode
- dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
+ dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(),
+ bcmd.getTargets(), bcmd.getTargetStorageTypes());
dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
break;
case DatanodeProtocol.DNA_INVALIDATE:
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Jul 23 01:47:28 2014
@@ -37,6 +37,7 @@ import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -122,7 +123,8 @@ class BlockReceiver implements Closeable
private boolean syncOnClose;
private long restartBudget;
- BlockReceiver(final ExtendedBlock block, final DataInputStream in,
+ BlockReceiver(final ExtendedBlock block, final StorageType storageType,
+ final DataInputStream in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
@@ -162,11 +164,11 @@ class BlockReceiver implements Closeable
// Open local disk out
//
if (isDatanode) { //replication or move
- replicaInfo = datanode.data.createTemporary(block);
+ replicaInfo = datanode.data.createTemporary(storageType, block);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
- replicaInfo = datanode.data.createRbw(block);
+ replicaInfo = datanode.data.createRbw(storageType, block);
datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
break;
@@ -198,7 +200,7 @@ class BlockReceiver implements Closeable
case TRANSFER_RBW:
case TRANSFER_FINALIZED:
// this is a transfer destination
- replicaInfo = datanode.data.createTemporary(block);
+ replicaInfo = datanode.data.createTemporary(storageType, block);
break;
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Jul 23 01:47:28 2014
@@ -19,11 +19,66 @@ package org.apache.hadoop.hdfs.server.da
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
+import static org.apache.hadoop.util.ExitUtil.terminate;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.BlockingService;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.channels.SocketChannel;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.management.ObjectName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,10 +94,23 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.net.DomainPeerServer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.hadoop.hdfs.protocol.datatransfer.*;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
@@ -50,9 +118,20 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
-import org.apache.hadoop.hdfs.protocolPB.*;
-import org.apache.hadoop.hdfs.security.token.block.*;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -65,7 +144,11 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.*;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.http.HttpConfig;
@@ -88,22 +171,21 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.*;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.JvmPauseMonitor;
+import org.apache.hadoop.util.ServicePlugin;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
-import javax.management.ObjectName;
-
-import java.io.*;
-import java.lang.management.ManagementFactory;
-import java.net.*;
-import java.nio.channels.SocketChannel;
-import java.security.PrivilegedExceptionAction;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-import static org.apache.hadoop.util.ExitUtil.terminate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
/**********************************************************
* DataNode is a class (and program) that stores a set of
@@ -1475,8 +1557,8 @@ public class DataNode extends Configured
return xmitsInProgress.get();
}
- private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[])
- throws IOException {
+ private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
+ StorageType[] xferTargetStorageTypes) throws IOException {
BPOfferService bpos = getBPOSForBlock(block);
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
@@ -1512,16 +1594,17 @@ public class DataNode extends Configured
LOG.info(bpReg + " Starting thread to transfer " +
block + " to " + xfersBuilder);
- new Daemon(new DataTransfer(xferTargets, block,
+ new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block,
BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
}
}
void transferBlocks(String poolId, Block blocks[],
- DatanodeInfo xferTargets[][]) {
+ DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) {
for (int i = 0; i < blocks.length; i++) {
try {
- transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]);
+ transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i],
+ xferTargetStorageTypes[i]);
} catch (IOException ie) {
LOG.warn("Failed to transfer block " + blocks[i], ie);
}
@@ -1624,6 +1707,7 @@ public class DataNode extends Configured
*/
private class DataTransfer implements Runnable {
final DatanodeInfo[] targets;
+ final StorageType[] targetStorageTypes;
final ExtendedBlock b;
final BlockConstructionStage stage;
final private DatanodeRegistration bpReg;
@@ -1634,7 +1718,8 @@ public class DataNode extends Configured
* Connect to the first item in the target list. Pass along the
* entire target list, the block, and the data.
*/
- DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage,
+ DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes,
+ ExtendedBlock b, BlockConstructionStage stage,
final String clientname) {
if (DataTransferProtocol.LOG.isDebugEnabled()) {
DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
@@ -1644,6 +1729,7 @@ public class DataNode extends Configured
+ ", targests=" + Arrays.asList(targets));
}
this.targets = targets;
+ this.targetStorageTypes = targetStorageTypes;
this.b = b;
this.stage = stage;
BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
@@ -1702,7 +1788,8 @@ public class DataNode extends Configured
false, false, true, DataNode.this, null, cachingStrategy);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
- new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
+ new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
+ clientname, targets, targetStorageTypes, srcNode,
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
// send data & checksum
@@ -2403,7 +2490,8 @@ public class DataNode extends Configured
* @param client client name
*/
void transferReplicaForPipelineRecovery(final ExtendedBlock b,
- final DatanodeInfo[] targets, final String client) throws IOException {
+ final DatanodeInfo[] targets, final StorageType[] targetStorageTypes,
+ final String client) throws IOException {
final long storedGS;
final long visible;
final BlockConstructionStage stage;
@@ -2436,7 +2524,7 @@ public class DataNode extends Configured
b.setNumBytes(visible);
if (targets.length > 0) {
- new DataTransfer(targets, b, stage, client).run();
+ new DataTransfer(targets, targetStorageTypes, b, stage, client).run();
}
}
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Jul 23 01:47:28 2014
@@ -45,6 +45,7 @@ import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -524,9 +525,11 @@ class DataXceiver extends Receiver imple
@Override
public void writeBlock(final ExtendedBlock block,
+ final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientname,
final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes,
final DatanodeInfo srcDataNode,
final BlockConstructionStage stage,
final int pipelineSize,
@@ -590,12 +593,13 @@ class DataXceiver extends Receiver imple
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver
- blockReceiver = new BlockReceiver(block, in,
+ blockReceiver = new BlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy);
+
storageUuid = blockReceiver.getStorageUuid();
} else {
storageUuid = datanode.data.recoverClose(
@@ -636,10 +640,10 @@ class DataXceiver extends Receiver imple
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(unbufMirrorIn);
- new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
- clientname, targets, srcDataNode, stage, pipelineSize,
- minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum,
- cachingStrategy);
+ new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
+ blockToken, clientname, targets, targetStorageTypes, srcDataNode,
+ stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+ latestGenerationStamp, requestedChecksum, cachingStrategy);
mirrorOut.flush();
@@ -754,7 +758,8 @@ class DataXceiver extends Receiver imple
public void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
- final DatanodeInfo[] targets) throws IOException {
+ final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes) throws IOException {
checkAccess(socketOut, true, blk, blockToken,
Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
previousOpClientName = clientName;
@@ -763,7 +768,8 @@ class DataXceiver extends Receiver imple
final DataOutputStream out = new DataOutputStream(
getOutputStream());
try {
- datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
+ datanode.transferReplicaForPipelineRecovery(blk, targets,
+ targetStorageTypes, clientName);
writeResponse(Status.SUCCESS, null, out);
} finally {
IOUtils.closeStream(out);
@@ -941,6 +947,7 @@ class DataXceiver extends Receiver imple
@Override
public void replaceBlock(final ExtendedBlock block,
+ final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
final DatanodeInfo proxySource) throws IOException {
@@ -1026,8 +1033,8 @@ class DataXceiver extends Receiver imple
DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
checksumInfo.getChecksum());
// open a block receiver and check if the block does not exist
- blockReceiver = new BlockReceiver(
- block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
+ blockReceiver = new BlockReceiver(block, storageType,
+ proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum,
CachingStrategy.newDropBehind());
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Wed Jul 23 01:47:28 2014
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -176,8 +177,8 @@ public interface FsDatasetSpi<V extends
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
- public ReplicaInPipelineInterface createTemporary(ExtendedBlock b
- ) throws IOException;
+ public ReplicaInPipelineInterface createTemporary(StorageType storageType,
+ ExtendedBlock b) throws IOException;
/**
* Creates a RBW replica and returns the meta info of the replica
@@ -186,8 +187,8 @@ public interface FsDatasetSpi<V extends
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
- public ReplicaInPipelineInterface createRbw(ExtendedBlock b
- ) throws IOException;
+ public ReplicaInPipelineInterface createRbw(StorageType storageType,
+ ExtendedBlock b) throws IOException;
/**
* Recovers a RBW replica and returns the meta info of the replica
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Wed Jul 23 01:47:28 2014
@@ -17,6 +17,28 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -24,12 +46,37 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.datanode.*;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
+import org.apache.hadoop.hdfs.server.datanode.Replica;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -43,15 +90,6 @@ import org.apache.hadoop.util.DiskChecke
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.*;
-import java.util.concurrent.Executor;
-
/**************************************************
* FSDataset manages a set of data blocks. Each block
* has a unique name and an extent on disk.
@@ -736,8 +774,8 @@ class FsDatasetImpl implements FsDataset
}
@Override // FsDatasetSpi
- public synchronized ReplicaInPipeline createRbw(ExtendedBlock b)
- throws IOException {
+ public synchronized ReplicaInPipeline createRbw(StorageType storageType,
+ ExtendedBlock b) throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
@@ -746,7 +784,7 @@ class FsDatasetImpl implements FsDataset
" and thus cannot be created.");
}
// create a new block
- FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
+ FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
// create a rbw file to hold block in the designated volume
File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
@@ -874,8 +912,8 @@ class FsDatasetImpl implements FsDataset
}
@Override // FsDatasetSpi
- public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b)
- throws IOException {
+ public synchronized ReplicaInPipeline createTemporary(StorageType storageType,
+ ExtendedBlock b) throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (replicaInfo != null) {
throw new ReplicaAlreadyExistsException("Block " + b +
@@ -883,7 +921,7 @@ class FsDatasetImpl implements FsDataset
" and thus cannot be created.");
}
- FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
+ FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
// create a temporary file to hold block in the designated volume
File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java Wed Jul 23 01:47:28 2014
@@ -18,13 +18,17 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
-import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.Time;
class FsVolumeList {
/**
@@ -52,11 +56,18 @@ class FsVolumeList {
* by a single thread and next volume is chosen with no concurrent
* update to {@link #volumes}.
* @param blockSize free space needed on the volume
+ * @param storageType the desired {@link StorageType}
* @return next volume to store the block in.
*/
- // TODO should choose volume with storage type
- synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException {
- return blockChooser.chooseVolume(volumes, blockSize);
+ synchronized FsVolumeImpl getNextVolume(StorageType storageType,
+ long blockSize) throws IOException {
+ final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size());
+ for(FsVolumeImpl v : volumes) {
+ if (v.getStorageType() == storageType) {
+ list.add(v);
+ }
+ }
+ return blockChooser.chooseVolume(list, blockSize);
}
long getDfsUsed() throws IOException {
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Wed Jul 23 01:47:28 2014
@@ -128,7 +128,8 @@ public class DatanodeWebHdfsMethods {
"://" + nnId);
boolean isLogical = HAUtil.isLogicalUri(conf, nnUri);
if (isLogical) {
- token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri));
+ token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri,
+ HdfsConstants.HDFS_URI_SCHEME));
} else {
token.setService(SecurityUtil.buildTokenService(nnUri));
}
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Wed Jul 23 01:47:28 2014
@@ -48,6 +48,15 @@ public interface FSClusterStats {
* @return Number of datanodes that are both alive and not decommissioned.
*/
public int getNumDatanodesInService();
+
+ /**
+ * an indication of the average load of non-decommission(ing|ed) nodes
+ * eligible for block placement
+ *
+ * @return average of the in service number of block transfers and block
+ * writes that are currently occurring on the cluster.
+ */
+ public double getInServiceXceiverAverage();
}
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Wed Jul 23 01:47:28 2014
@@ -225,6 +225,7 @@ public class FSImage implements Closeabl
NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
}
if (startOpt != StartupOption.UPGRADE
+ && startOpt != StartupOption.UPGRADEONLY
&& !RollingUpgradeStartupOption.STARTED.matches(startOpt)
&& layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
&& layoutVersion != HdfsConstants.NAMENODE_LAYOUT_VERSION) {
@@ -263,6 +264,7 @@ public class FSImage implements Closeabl
// 3. Do transitions
switch(startOpt) {
case UPGRADE:
+ case UPGRADEONLY:
doUpgrade(target);
return false; // upgrade saved image already
case IMPORT:
@@ -748,11 +750,13 @@ public class FSImage implements Closeabl
editLog.recoverUnclosedStreams();
} else if (HAUtil.isHAEnabled(conf, nameserviceId)
&& (startOpt == StartupOption.UPGRADE
+ || startOpt == StartupOption.UPGRADEONLY
|| RollingUpgradeStartupOption.ROLLBACK.matches(startOpt))) {
// This NN is HA, but we're doing an upgrade or a rollback of rolling
// upgrade so init the edit log for write.
editLog.initJournalsForWrite();
- if (startOpt == StartupOption.UPGRADE) {
+ if (startOpt == StartupOption.UPGRADE
+ || startOpt == StartupOption.UPGRADEONLY) {
long sharedLogCTime = editLog.getSharedLogCTime();
if (this.storage.getCTime() < sharedLogCTime) {
throw new IOException("It looks like the shared log is already " +
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Jul 23 01:47:28 2014
@@ -83,6 +83,9 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT;
+
import static org.apache.hadoop.util.Time.now;
import java.io.BufferedWriter;
@@ -528,6 +531,8 @@ public class FSNamesystem implements Nam
private final FSImage fsImage;
+ private boolean randomizeBlockLocationsPerBlock;
+
/**
* Notify that loading of this FSDirectory is complete, and
* it is imageLoaded for use
@@ -837,6 +842,10 @@ public class FSNamesystem implements Nam
alwaysUseDelegationTokensForTests = conf.getBoolean(
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
+
+ this.randomizeBlockLocationsPerBlock = conf.getBoolean(
+ DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK,
+ DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT);
this.dtSecretManager = createDelegationTokenSecretManager(conf);
this.dir = new FSDirectory(this, conf);
@@ -979,7 +988,8 @@ public class FSNamesystem implements Nam
}
// This will start a new log segment and write to the seen_txid file, so
// we shouldn't do it when coming up in standby state
- if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)) {
+ if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)
+ || (haEnabled && startOpt == StartupOption.UPGRADEONLY)) {
fsImage.openEditLogForWrite();
}
success = true;
@@ -1699,17 +1709,17 @@ public class FSNamesystem implements Nam
LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true,
true);
if (blocks != null) {
- blockManager.getDatanodeManager().sortLocatedBlocks(
- clientMachine, blocks.getLocatedBlocks());
-
+ blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
+ blocks.getLocatedBlocks(), randomizeBlockLocationsPerBlock);
+
// lastBlock is not part of getLocatedBlocks(), might need to sort it too
LocatedBlock lastBlock = blocks.getLastLocatedBlock();
if (lastBlock != null) {
ArrayList<LocatedBlock> lastBlockList =
Lists.newArrayListWithCapacity(1);
lastBlockList.add(lastBlock);
- blockManager.getDatanodeManager().sortLocatedBlocks(
- clientMachine, lastBlockList);
+ blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
+ lastBlockList, randomizeBlockLocationsPerBlock);
}
}
return blocks;
@@ -7319,7 +7329,18 @@ public class FSNamesystem implements Nam
@Override // FSClusterStats
public int getNumDatanodesInService() {
- return getNumLiveDataNodes() - getNumDecomLiveDataNodes();
+ return datanodeStatistics.getNumDatanodesInService();
+ }
+
+ @Override // for block placement strategy
+ public double getInServiceXceiverAverage() {
+ double avgLoad = 0;
+ final int nodes = getNumDatanodesInService();
+ if (nodes != 0) {
+ final int xceivers = datanodeStatistics.getInServiceXceiverCount();
+ avgLoad = (double)xceivers/nodes;
+ }
+ return avgLoad;
}
public SnapshotManager getSnapshotManager() {
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java Wed Jul 23 01:47:28 2014
@@ -836,7 +836,7 @@ public class NNStorage extends Storage i
*/
void processStartupOptionsForUpgrade(StartupOption startOpt, int layoutVersion)
throws IOException {
- if (startOpt == StartupOption.UPGRADE) {
+ if (startOpt == StartupOption.UPGRADE || startOpt == StartupOption.UPGRADEONLY) {
// If upgrade from a release that does not support federation,
// if clusterId is provided in the startupOptions use it.
// Else generate a new cluster ID
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Jul 23 01:47:28 2014
@@ -210,6 +210,9 @@ public class NameNode implements NameNod
+ StartupOption.UPGRADE.getName() +
" [" + StartupOption.CLUSTERID.getName() + " cid]" +
" [" + StartupOption.RENAMERESERVED.getName() + "<k-v pairs>] ] | \n\t["
+ + StartupOption.UPGRADEONLY.getName() +
+ " [" + StartupOption.CLUSTERID.getName() + " cid]" +
+ " [" + StartupOption.RENAMERESERVED.getName() + "<k-v pairs>] ] | \n\t["
+ StartupOption.ROLLBACK.getName() + "] | \n\t["
+ StartupOption.ROLLINGUPGRADE.getName() + " <"
+ RollingUpgradeStartupOption.DOWNGRADE.name().toLowerCase() + "|"
@@ -713,6 +716,7 @@ public class NameNode implements NameNod
* <li>{@link StartupOption#BACKUP BACKUP} - start backup node</li>
* <li>{@link StartupOption#CHECKPOINT CHECKPOINT} - start checkpoint node</li>
* <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster
+ * <li>{@link StartupOption#UPGRADEONLY UPGRADEONLY} - upgrade the cluster
* upgrade and create a snapshot of the current file system state</li>
* <li>{@link StartupOption#RECOVER RECOVERY} - recover name node
* metadata</li>
@@ -767,7 +771,8 @@ public class NameNode implements NameNod
}
protected HAState createHAState(StartupOption startOpt) {
- if (!haEnabled || startOpt == StartupOption.UPGRADE) {
+ if (!haEnabled || startOpt == StartupOption.UPGRADE
+ || startOpt == StartupOption.UPGRADEONLY) {
return ACTIVE_STATE;
} else {
return STANDBY_STATE;
@@ -1198,8 +1203,10 @@ public class NameNode implements NameNod
startOpt = StartupOption.BACKUP;
} else if (StartupOption.CHECKPOINT.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.CHECKPOINT;
- } else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)) {
- startOpt = StartupOption.UPGRADE;
+ } else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)
+ || StartupOption.UPGRADEONLY.getName().equalsIgnoreCase(cmd)) {
+ startOpt = StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd) ?
+ StartupOption.UPGRADE : StartupOption.UPGRADEONLY;
/* Can be followed by CLUSTERID with a required parameter or
* RENAMERESERVED with an optional parameter
*/
@@ -1407,6 +1414,12 @@ public class NameNode implements NameNod
terminate(0);
return null; // avoid javac warning
}
+ case UPGRADEONLY: {
+ DefaultMetricsSystem.initialize("NameNode");
+ new NameNode(conf);
+ terminate(0);
+ return null;
+ }
default: {
DefaultMetricsSystem.initialize("NameNode");
return new NameNode(conf);
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Wed Jul 23 01:47:28 2014
@@ -126,6 +126,7 @@ public class NamenodeFsck implements Dat
private boolean showBlocks = false;
private boolean showLocations = false;
private boolean showRacks = false;
+ private boolean showprogress = false;
private boolean showCorruptFileBlocks = false;
/**
@@ -203,6 +204,7 @@ public class NamenodeFsck implements Dat
else if (key.equals("blocks")) { this.showBlocks = true; }
else if (key.equals("locations")) { this.showLocations = true; }
else if (key.equals("racks")) { this.showRacks = true; }
+ else if (key.equals("showprogress")) { this.showprogress = true; }
else if (key.equals("openforwrite")) {this.showOpenFiles = true; }
else if (key.equals("listcorruptfileblocks")) {
this.showCorruptFileBlocks = true;
@@ -381,10 +383,13 @@ public class NamenodeFsck implements Dat
} else if (showFiles) {
out.print(path + " " + fileLen + " bytes, " +
blocks.locatedBlockCount() + " block(s): ");
- } else {
+ } else if (showprogress) {
out.print('.');
}
- if (res.totalFiles % 100 == 0) { out.println(); out.flush(); }
+ if ((showprogress) && res.totalFiles % 100 == 0) {
+ out.println();
+ out.flush();
+ }
int missing = 0;
int corrupt = 0;
long missize = 0;
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/XAttrStorage.java Wed Jul 23 01:47:28 2014
@@ -19,24 +19,30 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.util.List;
+import java.util.Map;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
-import org.apache.hadoop.hdfs.server.namenode.INode;
-
-import com.google.common.collect.ImmutableList;
/**
* XAttrStorage is used to read and set xattrs for an inode.
*/
@InterfaceAudience.Private
public class XAttrStorage {
-
+
+ private static final Map<String, String> internedNames = Maps.newHashMap();
+
/**
* Reads the existing extended attributes of an inode. If the
* inode does not have an <code>XAttr</code>, then this method
* returns an empty list.
+ * <p/>
+ * Must be called while holding the FSDirectory read lock.
+ *
* @param inode INode to read
* @param snapshotId
* @return List<XAttr> <code>XAttr</code> list.
@@ -48,6 +54,9 @@ public class XAttrStorage {
/**
* Reads the existing extended attributes of an inode.
+ * <p/>
+ * Must be called while holding the FSDirectory read lock.
+ *
* @param inode INode to read.
* @return List<XAttr> <code>XAttr</code> list.
*/
@@ -58,6 +67,9 @@ public class XAttrStorage {
/**
* Update xattrs of inode.
+ * <p/>
+ * Must be called while holding the FSDirectory write lock.
+ *
* @param inode INode to update
* @param xAttrs to update xAttrs.
* @param snapshotId id of the latest snapshot of the inode
@@ -70,8 +82,24 @@ public class XAttrStorage {
}
return;
}
-
- ImmutableList<XAttr> newXAttrs = ImmutableList.copyOf(xAttrs);
+ // Dedupe the xAttr name and save them into a new interned list
+ List<XAttr> internedXAttrs = Lists.newArrayListWithCapacity(xAttrs.size());
+ for (XAttr xAttr : xAttrs) {
+ final String name = xAttr.getName();
+ String internedName = internedNames.get(name);
+ if (internedName == null) {
+ internedName = name;
+ internedNames.put(internedName, internedName);
+ }
+ XAttr internedXAttr = new XAttr.Builder()
+ .setName(internedName)
+ .setNameSpace(xAttr.getNameSpace())
+ .setValue(xAttr.getValue())
+ .build();
+ internedXAttrs.add(internedXAttr);
+ }
+ // Save the list of interned xattrs
+ ImmutableList<XAttr> newXAttrs = ImmutableList.copyOf(internedXAttrs);
if (inode.getXAttrFeature() != null) {
inode.removeXAttrFeature(snapshotId);
}
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java Wed Jul 23 01:47:28 2014
@@ -81,6 +81,7 @@ public class BootstrapStandby implements
private boolean force = false;
private boolean interactive = true;
+ private boolean skipSharedEditsCheck = false;
// Exit/return codes.
static final int ERR_CODE_FAILED_CONNECT = 2;
@@ -117,6 +118,8 @@ public class BootstrapStandby implements
force = true;
} else if ("-nonInteractive".equals(arg)) {
interactive = false;
+ } else if ("-skipSharedEditsCheck".equals(arg)) {
+ skipSharedEditsCheck = true;
} else {
printUsage();
throw new HadoopIllegalArgumentException(
@@ -127,7 +130,7 @@ public class BootstrapStandby implements
private void printUsage() {
System.err.println("Usage: " + this.getClass().getSimpleName() +
- "[-force] [-nonInteractive]");
+ " [-force] [-nonInteractive] [-skipSharedEditsCheck]");
}
private NamenodeProtocol createNNProtocolProxy()
@@ -200,7 +203,7 @@ public class BootstrapStandby implements
// Ensure that we have enough edits already in the shared directory to
// start up from the last checkpoint on the active.
- if (!checkLogsAvailableForRead(image, imageTxId, curTxId)) {
+ if (!skipSharedEditsCheck && !checkLogsAvailableForRead(image, imageTxId, curTxId)) {
return ERR_CODE_LOGS_UNAVAILABLE;
}
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java Wed Jul 23 01:47:28 2014
@@ -28,6 +28,7 @@ import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
import javax.servlet.ServletContext;
@@ -84,6 +85,7 @@ import org.apache.hadoop.hdfs.web.resour
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
import org.apache.hadoop.hdfs.web.resources.DestinationParam;
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
+import org.apache.hadoop.hdfs.web.resources.ExcludeDatanodesParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.GroupParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
@@ -113,11 +115,13 @@ import org.apache.hadoop.hdfs.web.resour
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
+import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
@@ -190,12 +194,26 @@ public class NamenodeWebHdfsMethods {
}
return np;
}
-
+
@VisibleForTesting
static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset,
- final long blocksize) throws IOException {
+ final long blocksize, final String excludeDatanodes) throws IOException {
final BlockManager bm = namenode.getNamesystem().getBlockManager();
+
+ HashSet<Node> excludes = new HashSet<Node>();
+ if (excludeDatanodes != null) {
+ for (String host : StringUtils
+ .getTrimmedStringCollection(excludeDatanodes)) {
+ int idx = host.indexOf(":");
+ if (idx != -1) {
+ excludes.add(bm.getDatanodeManager().getDatanodeByXferAddr(
+ host.substring(0, idx), Integer.parseInt(host.substring(idx + 1))));
+ } else {
+ excludes.add(bm.getDatanodeManager().getDatanodeByHost(host));
+ }
+ }
+ }
if (op == PutOpParam.Op.CREATE) {
//choose a datanode near to client
@@ -204,7 +222,7 @@ public class NamenodeWebHdfsMethods {
if (clientNode != null) {
final DatanodeStorageInfo[] storages = bm.getBlockPlacementPolicy()
.chooseTarget(path, 1, clientNode,
- new ArrayList<DatanodeStorageInfo>(), false, null, blocksize,
+ new ArrayList<DatanodeStorageInfo>(), false, excludes, blocksize,
// TODO: get storage type from the file
StorageType.DEFAULT);
if (storages.length > 0) {
@@ -233,7 +251,7 @@ public class NamenodeWebHdfsMethods {
final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
final int count = locations.locatedBlockCount();
if (count > 0) {
- return bestNode(locations.get(0).getLocations());
+ return bestNode(locations.get(0).getLocations(), excludes);
}
}
}
@@ -247,11 +265,14 @@ public class NamenodeWebHdfsMethods {
* sorted based on availability and network distances, thus it is sufficient
* to return the first element of the node here.
*/
- private static DatanodeInfo bestNode(DatanodeInfo[] nodes) throws IOException {
- if (nodes.length == 0 || nodes[0].isDecommissioned()) {
- throw new IOException("No active nodes contain this block");
+ private static DatanodeInfo bestNode(DatanodeInfo[] nodes,
+ HashSet<Node> excludes) throws IOException {
+ for (DatanodeInfo dn: nodes) {
+ if (false == dn.isDecommissioned() && false == excludes.contains(dn)) {
+ return dn;
+ }
}
- return nodes[0];
+ throw new IOException("No active nodes contain this block");
}
private Token<? extends TokenIdentifier> generateDelegationToken(
@@ -270,11 +291,12 @@ public class NamenodeWebHdfsMethods {
final UserGroupInformation ugi, final DelegationParam delegation,
final UserParam username, final DoAsParam doAsUser,
final String path, final HttpOpParam.Op op, final long openOffset,
- final long blocksize,
+ final long blocksize, final String excludeDatanodes,
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
final DatanodeInfo dn;
try {
- dn = chooseDatanode(namenode, path, op, openOffset, blocksize);
+ dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
+ excludeDatanodes);
} catch (InvalidTopologyException ite) {
throw new IOException("Failed to find datanode, suggest to check cluster health.", ite);
}
@@ -361,13 +383,15 @@ public class NamenodeWebHdfsMethods {
@QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
final SnapshotNameParam snapshotName,
@QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
- final OldSnapshotNameParam oldSnapshotName
- )throws IOException, InterruptedException {
+ final OldSnapshotNameParam oldSnapshotName,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
+ ) throws IOException, InterruptedException {
return put(ugi, delegation, username, doAsUser, ROOT, op, destination,
owner, group, permission, overwrite, bufferSize, replication,
blockSize, modificationTime, accessTime, renameOptions, createParent,
delegationTokenArgument, aclPermission, xattrName, xattrValue,
- xattrSetFlag, snapshotName, oldSnapshotName);
+ xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes);
}
/** Handle HTTP PUT request. */
@@ -423,14 +447,16 @@ public class NamenodeWebHdfsMethods {
@QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
final SnapshotNameParam snapshotName,
@QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
- final OldSnapshotNameParam oldSnapshotName
+ final OldSnapshotNameParam oldSnapshotName,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, destination, owner,
group, permission, overwrite, bufferSize, replication, blockSize,
modificationTime, accessTime, renameOptions, delegationTokenArgument,
aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
- oldSnapshotName);
+ oldSnapshotName, excludeDatanodes);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@@ -441,7 +467,7 @@ public class NamenodeWebHdfsMethods {
permission, overwrite, bufferSize, replication, blockSize,
modificationTime, accessTime, renameOptions, createParent,
delegationTokenArgument, aclPermission, xattrName, xattrValue,
- xattrSetFlag, snapshotName, oldSnapshotName);
+ xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes);
} finally {
reset();
}
@@ -474,7 +500,8 @@ public class NamenodeWebHdfsMethods {
final XAttrValueParam xattrValue,
final XAttrSetFlagParam xattrSetFlag,
final SnapshotNameParam snapshotName,
- final OldSnapshotNameParam oldSnapshotName
+ final OldSnapshotNameParam oldSnapshotName,
+ final ExcludeDatanodesParam exclDatanodes
) throws IOException, URISyntaxException {
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
@@ -484,9 +511,10 @@ public class NamenodeWebHdfsMethods {
switch(op.getValue()) {
case CREATE:
{
- final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
- fullpath, op.getValue(), -1L, blockSize.getValue(conf),
- permission, overwrite, bufferSize, replication, blockSize);
+ final URI uri = redirectURI(namenode, ugi, delegation, username,
+ doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf),
+ exclDatanodes.getValue(), permission, overwrite, bufferSize,
+ replication, blockSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case MKDIRS:
@@ -619,9 +647,12 @@ public class NamenodeWebHdfsMethods {
@QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
final ConcatSourcesParam concatSrcs,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
- final BufferSizeParam bufferSize
+ final BufferSizeParam bufferSize,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
- return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs, bufferSize);
+ return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs,
+ bufferSize, excludeDatanodes);
}
/** Handle HTTP POST request. */
@@ -643,17 +674,21 @@ public class NamenodeWebHdfsMethods {
@QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
final ConcatSourcesParam concatSrcs,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
- final BufferSizeParam bufferSize
+ final BufferSizeParam bufferSize,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
- init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize);
+ init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize,
+ excludeDatanodes);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException, URISyntaxException {
try {
return post(ugi, delegation, username, doAsUser,
- path.getAbsolutePath(), op, concatSrcs, bufferSize);
+ path.getAbsolutePath(), op, concatSrcs, bufferSize,
+ excludeDatanodes);
} finally {
reset();
}
@@ -669,15 +704,17 @@ public class NamenodeWebHdfsMethods {
final String fullpath,
final PostOpParam op,
final ConcatSourcesParam concatSrcs,
- final BufferSizeParam bufferSize
+ final BufferSizeParam bufferSize,
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, URISyntaxException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
switch(op.getValue()) {
case APPEND:
{
- final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
- fullpath, op.getValue(), -1L, -1L, bufferSize);
+ final URI uri = redirectURI(namenode, ugi, delegation, username,
+ doAsUser, fullpath, op.getValue(), -1L, -1L,
+ excludeDatanodes.getValue(), bufferSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case CONCAT:
@@ -715,10 +752,12 @@ public class NamenodeWebHdfsMethods {
@QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT)
final List<XAttrNameParam> xattrNames,
@QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT)
- final XAttrEncodingParam xattrEncoding
+ final XAttrEncodingParam xattrEncoding,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
return get(ugi, delegation, username, doAsUser, ROOT, op, offset, length,
- renewer, bufferSize, xattrNames, xattrEncoding);
+ renewer, bufferSize, xattrNames, xattrEncoding, excludeDatanodes);
}
/** Handle HTTP GET request. */
@@ -747,11 +786,13 @@ public class NamenodeWebHdfsMethods {
@QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT)
final List<XAttrNameParam> xattrNames,
@QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT)
- final XAttrEncodingParam xattrEncoding
+ final XAttrEncodingParam xattrEncoding,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, offset, length,
- renewer, bufferSize, xattrEncoding);
+ renewer, bufferSize, xattrEncoding, excludeDatanodes);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@@ -759,7 +800,7 @@ public class NamenodeWebHdfsMethods {
try {
return get(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
- xattrNames, xattrEncoding);
+ xattrNames, xattrEncoding, excludeDatanodes);
} finally {
reset();
}
@@ -779,7 +820,8 @@ public class NamenodeWebHdfsMethods {
final RenewerParam renewer,
final BufferSizeParam bufferSize,
final List<XAttrNameParam> xattrNames,
- final XAttrEncodingParam xattrEncoding
+ final XAttrEncodingParam xattrEncoding,
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, URISyntaxException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final NamenodeProtocols np = getRPCServer(namenode);
@@ -787,8 +829,9 @@ public class NamenodeWebHdfsMethods {
switch(op.getValue()) {
case OPEN:
{
- final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
- fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize);
+ final URI uri = redirectURI(namenode, ugi, delegation, username,
+ doAsUser, fullpath, op.getValue(), offset.getValue(), -1L,
+ excludeDatanodes.getValue(), offset, length, bufferSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case GET_BLOCK_LOCATIONS:
@@ -824,7 +867,7 @@ public class NamenodeWebHdfsMethods {
case GETFILECHECKSUM:
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
- fullpath, op.getValue(), -1L, -1L);
+ fullpath, op.getValue(), -1L, -1L, null);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case GETDELEGATIONTOKEN:
Modified: hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java?rev=1612742&r1=1612741&r2=1612742&view=diff
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java (original)
+++ hadoop/common/branches/MR-2841/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java Wed Jul 23 01:47:28 2014
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
@@ -50,6 +51,7 @@ public class BlockCommand extends Datano
final String poolId;
final Block[] blocks;
final DatanodeInfo[][] targets;
+ final StorageType[][] targetStorageTypes;
final String[][] targetStorageIDs;
/**
@@ -62,17 +64,20 @@ public class BlockCommand extends Datano
this.poolId = poolId;
blocks = new Block[blocktargetlist.size()];
targets = new DatanodeInfo[blocks.length][];
+ targetStorageTypes = new StorageType[blocks.length][];
targetStorageIDs = new String[blocks.length][];
for(int i = 0; i < blocks.length; i++) {
BlockTargetPair p = blocktargetlist.get(i);
blocks[i] = p.block;
targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets);
+ targetStorageTypes[i] = DatanodeStorageInfo.toStorageTypes(p.targets);
targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets);
}
}
private static final DatanodeInfo[][] EMPTY_TARGET_DATANODES = {};
+ private static final StorageType[][] EMPTY_TARGET_STORAGE_TYPES = {};
private static final String[][] EMPTY_TARGET_STORAGEIDS = {};
/**
@@ -81,7 +86,7 @@ public class BlockCommand extends Datano
*/
public BlockCommand(int action, String poolId, Block blocks[]) {
this(action, poolId, blocks, EMPTY_TARGET_DATANODES,
- EMPTY_TARGET_STORAGEIDS);
+ EMPTY_TARGET_STORAGE_TYPES, EMPTY_TARGET_STORAGEIDS);
}
/**
@@ -89,11 +94,13 @@ public class BlockCommand extends Datano
* @param blocks blocks related to the action
*/
public BlockCommand(int action, String poolId, Block[] blocks,
- DatanodeInfo[][] targets, String[][] targetStorageIDs) {
+ DatanodeInfo[][] targets, StorageType[][] targetStorageTypes,
+ String[][] targetStorageIDs) {
super(action);
this.poolId = poolId;
this.blocks = blocks;
this.targets = targets;
+ this.targetStorageTypes = targetStorageTypes;
this.targetStorageIDs = targetStorageIDs;
}
@@ -109,6 +116,10 @@ public class BlockCommand extends Datano
return targets;
}
+ public StorageType[][] getTargetStorageTypes() {
+ return targetStorageTypes;
+ }
+
public String[][] getTargetStorageIDs() {
return targetStorageIDs;
}