You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2012/08/17 18:52:08 UTC
svn commit: r1374355 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/
src/main/java/org/apache/hadoop/hdfs/protocolPB/...
Author: atm
Date: Fri Aug 17 16:52:07 2012
New Revision: 1374355
URL: http://svn.apache.org/viewvc?rev=1374355&view=rev
Log:
HDFS-3672. Expose disk-location information for blocks to enable better scheduling. Contributed by Andrew Wang.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Aug 17 16:52:07 2012
@@ -402,6 +402,9 @@ Branch-2 ( Unreleased changes )
HDFS-2963. Console Output is confusing while executing metasave
(dfsadmin command). (Andrew Wang via eli)
+ HDFS-3672. Expose disk-location information for blocks to enable better
+ scheduling. (Andrew Wang via atm)
+
OPTIMIZATIONS
HDFS-2982. Startup performance suffers when there are many edit log
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java?rev=1374355&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java Fri Aug 17 16:52:07 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Wrapper for {@link BlockLocation} that also adds {@link VolumeId} volume
+ * location information for each replica.
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Public
+public class BlockStorageLocation extends BlockLocation {
+
+ private final VolumeId[] volumeIds;
+
+ public BlockStorageLocation(BlockLocation loc, VolumeId[] volumeIds)
+ throws IOException {
+ // Initialize with data from passed in BlockLocation
+ super(loc.getNames(), loc.getHosts(), loc.getTopologyPaths(), loc
+ .getOffset(), loc.getLength(), loc.isCorrupt());
+ this.volumeIds = volumeIds;
+ }
+
+ /**
+ * Gets the list of {@link VolumeId} corresponding to the block's replicas.
+ *
+ * @return volumeIds list of VolumeId for the block's replicas
+ */
+ public VolumeId[] getVolumeIds() {
+ return volumeIds;
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java?rev=1374355&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java Fri Aug 17 16:52:07 2012
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+
+/**
+ * Wrapper for {@link BlockLocation} that also includes a {@link LocatedBlock},
+ * allowing more detailed queries to the datanode about a block.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HdfsBlockLocation extends BlockLocation {
+
+ private final LocatedBlock block;
+
+ public HdfsBlockLocation(BlockLocation loc, LocatedBlock block)
+ throws IOException {
+ // Initialize with data from passed in BlockLocation
+ super(loc.getNames(), loc.getHosts(), loc.getTopologyPaths(),
+ loc.getOffset(), loc.getLength(), loc.isCorrupt());
+ this.block = block;
+ }
+
+ public LocatedBlock getLocatedBlock() {
+ return block;
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java?rev=1374355&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java Fri Aug 17 16:52:07 2012
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * HDFS-specific volume identifier which implements {@link VolumeId}. Can be
+ * used to differentiate between the data directories on a single datanode. This
+ * identifier is only unique on a per-datanode basis.
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Public
+public class HdfsVolumeId implements VolumeId {
+
+ private final byte id;
+ private final boolean isValid;
+
+ public HdfsVolumeId(byte id, boolean isValid) {
+ this.id = id;
+ this.isValid = isValid;
+ }
+
+ @Override
+ public boolean isValid() {
+ return isValid;
+ }
+
+ @Override
+ public int compareTo(VolumeId arg0) {
+ return hashCode() - arg0.hashCode();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder().append(id).toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || obj.getClass() != getClass()) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+
+ HdfsVolumeId that = (HdfsVolumeId) obj;
+ return new EqualsBuilder().append(this.id, that.id).isEquals();
+ }
+
+ @Override
+ public String toString() {
+ return Byte.toString(id);
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java?rev=1374355&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java Fri Aug 17 16:52:07 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Opaque interface that identifies a disk location. Subclasses
+ * should implement {@link Comparable} and override both equals and hashCode.
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Public
+public interface VolumeId extends Comparable<VolumeId> {
+
+ /**
+ * Indicates if the disk identifier is valid. Invalid identifiers indicate
+ * that the block was not present, or the location could otherwise not be
+ * determined.
+ *
+ * @return true if the disk identifier is valid
+ */
+ public boolean isValid();
+
+ @Override
+ abstract public int compareTo(VolumeId arg0);
+
+ @Override
+ abstract public int hashCode();
+
+ @Override
+ abstract public boolean equals(Object obj);
+
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java?rev=1374355&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java Fri Aug 17 16:52:07 2012
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.BlockStorageLocation;
+import org.apache.hadoop.fs.HdfsVolumeId;
+import org.apache.hadoop.fs.VolumeId;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.token.Token;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class BlockStorageLocationUtil {
+
+ private static final Log LOG = LogFactory
+ .getLog(BlockStorageLocationUtil.class);
+
+ /**
+ * Create a list of {@link VolumeBlockLocationCallable} corresponding to a set
+ * of datanodes and blocks.
+ *
+ * @param datanodeBlocks
+ * Map of datanodes to block replicas at each datanode
+ * @return callables Used to query each datanode for location information on
+ * the block replicas at the datanode
+ */
+ private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables(
+ Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
+ int timeout, boolean connectToDnViaHostname) {
+ // Construct the callables, one per datanode
+ List<VolumeBlockLocationCallable> callables =
+ new ArrayList<VolumeBlockLocationCallable>();
+ for (Map.Entry<DatanodeInfo, List<LocatedBlock>> entry : datanodeBlocks
+ .entrySet()) {
+ // Construct RPC parameters
+ DatanodeInfo datanode = entry.getKey();
+ List<LocatedBlock> locatedBlocks = entry.getValue();
+ List<ExtendedBlock> extendedBlocks =
+ new ArrayList<ExtendedBlock>(locatedBlocks.size());
+ List<Token<BlockTokenIdentifier>> dnTokens =
+ new ArrayList<Token<BlockTokenIdentifier>>(
+ locatedBlocks.size());
+ for (LocatedBlock b : locatedBlocks) {
+ extendedBlocks.add(b.getBlock());
+ dnTokens.add(b.getBlockToken());
+ }
+ VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable(
+ conf, datanode, extendedBlocks, dnTokens, timeout,
+ connectToDnViaHostname);
+ callables.add(callable);
+ }
+ return callables;
+ }
+
+ /**
+ * Queries datanodes for the blocks specified in <code>datanodeBlocks</code>,
+ * making one RPC to each datanode. These RPCs are made in parallel using a
+ * threadpool.
+ *
+ * @param datanodeBlocks
+ * Map of datanodes to the blocks present on the DN
+ * @return metadatas List of block metadata for each datanode, specifying
+ * volume locations for each block
+ * @throws InvalidBlockTokenException
+ * if client does not have read access on a requested block
+ */
+ static List<HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata(
+ Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
+ int poolsize, int timeout, boolean connectToDnViaHostname)
+ throws InvalidBlockTokenException {
+
+ List<VolumeBlockLocationCallable> callables =
+ createVolumeBlockLocationCallables(conf, datanodeBlocks, timeout,
+ connectToDnViaHostname);
+
+ // Use a thread pool to execute the Callables in parallel
+ List<Future<HdfsBlocksMetadata>> futures =
+ new ArrayList<Future<HdfsBlocksMetadata>>();
+ ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize);
+ try {
+ futures = executor.invokeAll(callables, timeout, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // Swallow the exception here, because we can return partial results
+ }
+ executor.shutdown();
+
+ // Initialize metadatas list with nulls
+ // This is used to later indicate if we didn't get a response from a DN
+ List<HdfsBlocksMetadata> metadatas = new ArrayList<HdfsBlocksMetadata>();
+ for (int i = 0; i < futures.size(); i++) {
+ metadatas.add(null);
+ }
+ // Fill in metadatas with results from DN RPCs, where possible
+ for (int i = 0; i < futures.size(); i++) {
+ Future<HdfsBlocksMetadata> future = futures.get(i);
+ try {
+ HdfsBlocksMetadata metadata = future.get();
+ metadatas.set(i, metadata);
+ } catch (ExecutionException e) {
+ VolumeBlockLocationCallable callable = callables.get(i);
+ DatanodeInfo datanode = callable.getDatanodeInfo();
+ Throwable t = e.getCause();
+ if (t instanceof InvalidBlockTokenException) {
+ LOG.warn("Invalid access token when trying to retrieve "
+ + "information from datanode " + datanode.getIpcAddr(false));
+ throw (InvalidBlockTokenException) t;
+ }
+ else if (t instanceof UnsupportedOperationException) {
+ LOG.info("Datanode " + datanode.getIpcAddr(false) + " does not support"
+ + " required #getHdfsBlocksMetadata() API");
+ throw (UnsupportedOperationException) t;
+ } else {
+ LOG.info("Failed to connect to datanode " +
+ datanode.getIpcAddr(false));
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Could not fetch information from datanode", t);
+ }
+ } catch (InterruptedException e) {
+ // Shouldn't happen, because invokeAll waits for all Futures to be ready
+ LOG.info("Interrupted while fetching HdfsBlocksMetadata");
+ }
+ }
+
+ return metadatas;
+ }
+
+ /**
+ * Group the per-replica {@link VolumeId} info returned from
+ * {@link DFSClient#queryDatanodesForHdfsBlocksMetadata(Map)} to be associated
+ * with the corresponding {@link LocatedBlock}.
+ *
+ * @param blocks
+ * Original LocatedBlock array
+ * @param datanodeBlocks
+ * Mapping from datanodes to the list of replicas on each datanode
+ * @param metadatas
+ * VolumeId information for the replicas on each datanode
+ * @return blockVolumeIds per-replica VolumeId information associated with the
+ * parent LocatedBlock
+ */
+ static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks(
+ List<LocatedBlock> blocks, Map<DatanodeInfo,
+ List<LocatedBlock>> datanodeBlocks, List<HdfsBlocksMetadata> metadatas) {
+
+ // Initialize mapping of ExtendedBlock to LocatedBlock.
+ // Used to associate results from DN RPCs to the parent LocatedBlock
+ Map<ExtendedBlock, LocatedBlock> extBlockToLocBlock =
+ new HashMap<ExtendedBlock, LocatedBlock>();
+ for (LocatedBlock b : blocks) {
+ extBlockToLocBlock.put(b.getBlock(), b);
+ }
+
+ // Initialize the mapping of blocks -> list of VolumeIds, one per replica
+ // This is filled out with real values from the DN RPCs
+ Map<LocatedBlock, List<VolumeId>> blockVolumeIds =
+ new HashMap<LocatedBlock, List<VolumeId>>();
+ for (LocatedBlock b : blocks) {
+ ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length);
+ // Start off all IDs as invalid, fill it in later with results from RPCs
+ for (int i = 0; i < b.getLocations().length; i++) {
+ l.add(new HdfsVolumeId((byte)-1, false));
+ }
+ blockVolumeIds.put(b, l);
+ }
+
+ // Iterate through the list of metadatas (one per datanode).
+ // For each metadata, if it's valid, insert its volume location information
+ // into the Map returned to the caller
+ Iterator<HdfsBlocksMetadata> metadatasIter = metadatas.iterator();
+ Iterator<DatanodeInfo> datanodeIter = datanodeBlocks.keySet().iterator();
+ while (metadatasIter.hasNext()) {
+ HdfsBlocksMetadata metadata = metadatasIter.next();
+ DatanodeInfo datanode = datanodeIter.next();
+ // Check if metadata is valid
+ if (metadata == null) {
+ continue;
+ }
+ ExtendedBlock[] metaBlocks = metadata.getBlocks();
+ List<byte[]> metaVolumeIds = metadata.getVolumeIds();
+ List<Integer> metaVolumeIndexes = metadata.getVolumeIndexes();
+ // Add VolumeId for each replica in the HdfsBlocksMetadata
+ for (int j = 0; j < metaBlocks.length; j++) {
+ int volumeIndex = metaVolumeIndexes.get(j);
+ ExtendedBlock extBlock = metaBlocks[j];
+ // Skip if block wasn't found, or not a valid index into metaVolumeIds
+ // Also skip if the DN responded with a block we didn't ask for
+ if (volumeIndex == Integer.MAX_VALUE
+ || volumeIndex >= metaVolumeIds.size()
+ || !extBlockToLocBlock.containsKey(extBlock)) {
+ continue;
+ }
+ // Get the VolumeId by indexing into the list of VolumeIds
+ // provided by the datanode
+ HdfsVolumeId id = new HdfsVolumeId(metaVolumeIds.get(volumeIndex)[0],
+ true);
+ // Find out which index we are in the LocatedBlock's replicas
+ LocatedBlock locBlock = extBlockToLocBlock.get(extBlock);
+ DatanodeInfo[] dnInfos = locBlock.getLocations();
+ int index = -1;
+ for (int k = 0; k < dnInfos.length; k++) {
+ if (dnInfos[k].equals(datanode)) {
+ index = k;
+ break;
+ }
+ }
+ if (index < 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Datanode responded with a block volume id we did" +
+ " not request, omitting.");
+ }
+ continue;
+ }
+ // Place VolumeId at the same index as the DN's index in the list of
+ // replicas
+ List<VolumeId> VolumeIds = blockVolumeIds.get(locBlock);
+ VolumeIds.set(index, id);
+ }
+ }
+ return blockVolumeIds;
+ }
+
+ /**
+ * Helper method to combine a list of {@link LocatedBlock} with associated
+ * {@link VolumeId} information to form a list of {@link BlockStorageLocation}
+ * .
+ */
+ static BlockStorageLocation[] convertToVolumeBlockLocations(
+ List<LocatedBlock> blocks,
+ Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException {
+ // Construct the final return value of VolumeBlockLocation[]
+ BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks);
+ List<BlockStorageLocation> volumeBlockLocs =
+ new ArrayList<BlockStorageLocation>(locations.length);
+ for (int i = 0; i < locations.length; i++) {
+ LocatedBlock locBlock = blocks.get(i);
+ List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
+ BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i],
+ volumeIds.toArray(new VolumeId[0]));
+ volumeBlockLocs.add(bsLoc);
+ }
+ return volumeBlockLocs.toArray(new BlockStorageLocation[] {});
+ }
+
+ /**
+ * Callable that sets up an RPC proxy to a datanode and queries it for
+ * volume location information for a list of ExtendedBlocks.
+ */
+ private static class VolumeBlockLocationCallable implements
+ Callable<HdfsBlocksMetadata> {
+
+ private Configuration configuration;
+ private int timeout;
+ private DatanodeInfo datanode;
+ private List<ExtendedBlock> extendedBlocks;
+ private List<Token<BlockTokenIdentifier>> dnTokens;
+ private boolean connectToDnViaHostname;
+
+ VolumeBlockLocationCallable(Configuration configuration,
+ DatanodeInfo datanode, List<ExtendedBlock> extendedBlocks,
+ List<Token<BlockTokenIdentifier>> dnTokens, int timeout,
+ boolean connectToDnViaHostname) {
+ this.configuration = configuration;
+ this.timeout = timeout;
+ this.datanode = datanode;
+ this.extendedBlocks = extendedBlocks;
+ this.dnTokens = dnTokens;
+ this.connectToDnViaHostname = connectToDnViaHostname;
+ }
+
+ public DatanodeInfo getDatanodeInfo() {
+ return datanode;
+ }
+
+ @Override
+ public HdfsBlocksMetadata call() throws Exception {
+ HdfsBlocksMetadata metadata = null;
+ // Create the RPC proxy and make the RPC
+ ClientDatanodeProtocol cdp = null;
+ try {
+ cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
+ timeout, connectToDnViaHostname);
+ metadata = cdp.getHdfsBlocksMetadata(extendedBlocks, dnTokens);
+ } catch (IOException e) {
+ // Bubble this up to the caller, handle with the Future
+ throw e;
+ } finally {
+ if (cdp != null) {
+ RPC.stopProxy(cdp);
+ }
+ }
+ return metadata;
+ }
+ }
+}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Aug 17 16:52:07 2012
@@ -45,8 +45,6 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
@@ -69,6 +67,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -80,6 +79,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.BlockStorageLocation;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@@ -87,12 +87,14 @@ import org.apache.hadoop.fs.FileAlreadyE
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
@@ -102,6 +104,7 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
@@ -120,8 +123,9 @@ import org.apache.hadoop.hdfs.protocol.d
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -216,6 +220,9 @@ public class DFSClient implements java.i
final FsPermission uMask;
final boolean useLegacyBlockReader;
final boolean connectToDnViaHostname;
+ final boolean getHdfsBlocksMetadataEnabled;
+ final int getFileBlockStorageLocationsNumThreads;
+ final int getFileBlockStorageLocationsTimeout;
Conf(Configuration conf) {
maxFailoverAttempts = conf.getInt(
@@ -268,6 +275,15 @@ public class DFSClient implements java.i
DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+ getHdfsBlocksMetadataEnabled = conf.getBoolean(
+ DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+ DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+ getFileBlockStorageLocationsNumThreads = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS,
+ DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT);
+ getFileBlockStorageLocationsTimeout = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT,
+ DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT);
}
private DataChecksum.Type getChecksumType(Configuration conf) {
@@ -942,7 +958,81 @@ public class DFSClient implements java.i
public BlockLocation[] getBlockLocations(String src, long start,
long length) throws IOException, UnresolvedLinkException {
LocatedBlocks blocks = getLocatedBlocks(src, start, length);
- return DFSUtil.locatedBlocks2Locations(blocks);
+ BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks);
+ HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length];
+ for (int i = 0; i < locations.length; i++) {
+ hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i));
+ }
+ return hdfsLocations;
+ }
+
+ /**
+ * Get block location information about a list of {@link HdfsBlockLocation}.
+ * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to
+ * get {@link BlockStorageLocation}s for blocks returned by
+ * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)}
+ * .
+ *
+ * This is done by making a round of RPCs to the associated datanodes, asking
+ * the volume of each block replica. The returned array of
+ * {@link BlockStorageLocation} expose this information as a
+ * {@link VolumeId}.
+ *
+ * @param blockLocations
+ * target blocks on which to query volume location information
+ * @return volumeBlockLocations original block array augmented with additional
+ * volume location information for each replica.
+ */
+ public BlockStorageLocation[] getBlockStorageLocations(
+ List<BlockLocation> blockLocations) throws IOException,
+ UnsupportedOperationException, InvalidBlockTokenException {
+ if (!getConf().getHdfsBlocksMetadataEnabled) {
+ throw new UnsupportedOperationException("Datanode-side support for " +
+ "getVolumeBlockLocations() must also be enabled in the client " +
+ "configuration.");
+ }
+ // Downcast blockLocations and fetch out required LocatedBlock(s)
+ List<LocatedBlock> blocks = new ArrayList<LocatedBlock>();
+ for (BlockLocation loc : blockLocations) {
+ if (!(loc instanceof HdfsBlockLocation)) {
+ throw new ClassCastException("DFSClient#getVolumeBlockLocations " +
+ "expected to be passed HdfsBlockLocations");
+ }
+ HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc;
+ blocks.add(hdfsLoc.getLocatedBlock());
+ }
+
+ // Re-group the LocatedBlocks to be grouped by datanodes, with the values
+ // a list of the LocatedBlocks on the datanode.
+ Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks =
+ new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>();
+ for (LocatedBlock b : blocks) {
+ for (DatanodeInfo info : b.getLocations()) {
+ if (!datanodeBlocks.containsKey(info)) {
+ datanodeBlocks.put(info, new ArrayList<LocatedBlock>());
+ }
+ List<LocatedBlock> l = datanodeBlocks.get(info);
+ l.add(b);
+ }
+ }
+
+ // Make RPCs to the datanodes to get volume locations for its replicas
+ List<HdfsBlocksMetadata> metadatas = BlockStorageLocationUtil
+ .queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
+ getConf().getFileBlockStorageLocationsNumThreads,
+ getConf().getFileBlockStorageLocationsTimeout,
+ getConf().connectToDnViaHostname);
+
+ // Regroup the returned VolumeId metadata to again be grouped by
+ // LocatedBlock rather than by datanode
+ Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil
+ .associateVolumeIdsWithBlocks(blocks, datanodeBlocks, metadatas);
+
+ // Combine original BlockLocations with new VolumeId information
+ BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil
+ .convertToVolumeBlockLocations(blocks, blockVolumeIds);
+
+ return volumeBlockLocations;
}
public DFSInputStream open(String src)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Aug 17 16:52:07 2012
@@ -54,6 +54,12 @@ public class DFSConfigKeys extends Commo
public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false;
+ public static final String DFS_HDFS_BLOCKS_METADATA_ENABLED = "dfs.datanode.hdfs-blocks-metadata.enabled";
+ public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
+ public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads";
+ public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT = 10;
+ public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT = "dfs.client.file-block-storage-locations.timeout";
+ public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT = 60;
// HA related configuration
public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider";
@@ -245,7 +251,7 @@ public class DFSConfigKeys extends Commo
public static final String DFS_DATANODE_DU_RESERVED_KEY = "dfs.datanode.du.reserved";
public static final long DFS_DATANODE_DU_RESERVED_DEFAULT = 0;
public static final String DFS_DATANODE_HANDLER_COUNT_KEY = "dfs.datanode.handler.count";
- public static final int DFS_DATANODE_HANDLER_COUNT_DEFAULT = 3;
+ public static final int DFS_DATANODE_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_DATANODE_HTTP_ADDRESS_KEY = "dfs.datanode.http.address";
public static final int DFS_DATANODE_HTTP_DEFAULT_PORT = 50075;
public static final String DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTP_DEFAULT_PORT;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Fri Aug 17 16:52:07 2012
@@ -282,13 +282,25 @@ public class DFSUtil {
if (blocks == null) {
return new BlockLocation[0];
}
- int nrBlocks = blocks.locatedBlockCount();
+ return locatedBlocks2Locations(blocks.getLocatedBlocks());
+ }
+
+ /**
+ * Convert a List<LocatedBlock> to BlockLocation[]
+ * @param blocks A List<LocatedBlock> to be converted
+ * @return converted array of BlockLocation
+ */
+ public static BlockLocation[] locatedBlocks2Locations(List<LocatedBlock> blocks) {
+ if (blocks == null) {
+ return new BlockLocation[0];
+ }
+ int nrBlocks = blocks.size();
BlockLocation[] blkLocations = new BlockLocation[nrBlocks];
if (nrBlocks == 0) {
return blkLocations;
}
int idx = 0;
- for (LocatedBlock blk : blocks.getLocatedBlocks()) {
+ for (LocatedBlock blk : blocks) {
assert idx < nrBlocks : "Incorrect index";
DatanodeInfo[] locations = blk.getLocations();
String[] hosts = new String[locations.length];
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Aug 17 16:52:07 2012
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -31,6 +32,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.BlockStorageLocation;
+import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -55,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -188,6 +192,36 @@ public class DistributedFileSystem exten
}
+ /**
+ * Used to query storage location information for a list of blocks. This list
+ * of blocks is normally constructed via a series of calls to
+ * {@link DistributedFileSystem#getFileBlockLocations(Path, long, long)} to
+ * get the blocks for ranges of a file.
+ *
+ * The returned array of {@link BlockStorageLocation} augments
+ * {@link BlockLocation} with a {@link VolumeId} per block replica. The
+ * VolumeId specifies the volume on the datanode on which the replica resides.
+ * The VolumeId has to be checked via {@link VolumeId#isValid()} before being
+ * used because volume information can be unavailable if the corresponding
+ * datanode is down or if the requested block is not found.
+ *
+ * This API is unstable, and datanode-side support is disabled by default. It
+ * can be enabled by setting "dfs.datanode.hdfs-blocks-metadata.enabled" to
+ * true.
+ *
+ * @param blocks
+ * List of target BlockLocations to query volume location information
+ * @return volumeBlockLocations Augmented array of
+ * {@link BlockStorageLocation}s containing additional volume location
+ * information for each replica of each block.
+ */
+ @InterfaceStability.Unstable
+ public BlockStorageLocation[] getFileBlockStorageLocations(
+ List<BlockLocation> blocks) throws IOException,
+ UnsupportedOperationException, InvalidBlockTokenException {
+ return dfs.getBlockStorageLocations(blocks);
+ }
+
@Override
public void setVerifyChecksum(boolean verifyChecksum) {
this.verifyChecksum = verifyChecksum;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Fri Aug 17 16:52:07 2012
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -106,4 +107,21 @@ public interface ClientDatanodeProtocol
*/
BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
Token<BlockTokenIdentifier> token) throws IOException;
+
+ /**
+ * Retrieves volume location information about a list of blocks on a datanode.
+ * This is in the form of an opaque {@link VolumeId} for each configured
+ * data directory, which is not guaranteed to be the same across DN restarts.
+ *
+ * @param blocks
+ * list of blocks on the local datanode
+ * @param tokens
+ * block access tokens corresponding to the requested blocks
+ * @return an HdfsBlocksMetadata that associates {@link ExtendedBlock}s with
+ * data directories
+ * @throws IOException
+ * if datanode is unreachable, or replica is not found on datanode
+ */
+ HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
+ List<Token<BlockTokenIdentifier>> tokens) throws IOException;
}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java?rev=1374355&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java Fri Aug 17 16:52:07 2012
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Augments an array of blocks on a datanode with additional information about
+ * where the block is stored.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HdfsBlocksMetadata {
+
+ /**
+ * List of blocks
+ */
+ private final ExtendedBlock[] blocks;
+
+ /**
+ * List of volumes
+ */
+ private final List<byte[]> volumeIds;
+
+ /**
+ * List of indexes into <code>volumeIds</code>, one per block in
+ * <code>blocks</code>. A value of Integer.MAX_VALUE indicates that the
+ * block was not found.
+ */
+ private final List<Integer> volumeIndexes;
+
+ /**
+ * Constructs HdfsBlocksMetadata.
+ *
+ * @param blocks
+ * List of blocks described
+ * @param volumeIds
+ * List of potential volume identifiers, specifying volumes where
+ * blocks may be stored
+ * @param volumeIndexes
+ * Indexes into the list of volume identifiers, one per block
+ */
+ public HdfsBlocksMetadata(ExtendedBlock[] blocks, List<byte[]> volumeIds,
+ List<Integer> volumeIndexes) {
+ this.blocks = blocks;
+ this.volumeIds = volumeIds;
+ this.volumeIndexes = volumeIndexes;
+ }
+
+ /**
+ * Get the array of blocks.
+ *
+ * @return array of blocks
+ */
+ public ExtendedBlock[] getBlocks() {
+ return blocks;
+ }
+
+ /**
+ * Get the list of volume identifiers in raw byte form.
+ *
+ * @return list of ids
+ */
+ public List<byte[]> getVolumeIds() {
+ return volumeIds;
+ }
+
+ /**
+ * Get a list of indexes into the array of {@link VolumeId}s, one per block.
+ *
+ * @return list of indexes
+ */
+ public List<Integer> getVolumeIndexes() {
+ return volumeIndexes;
+ }
+}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java Fri Aug 17 16:52:07 2012
@@ -18,19 +18,31 @@
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@@ -106,4 +118,38 @@ public class ClientDatanodeProtocolServe
.setLocalPath(resp.getBlockPath()).setLocalMetaPath(resp.getMetaPath())
.build();
}
+
+ @Override
+ public GetHdfsBlockLocationsResponseProto getHdfsBlockLocations(
+ RpcController controller, GetHdfsBlockLocationsRequestProto request)
+ throws ServiceException {
+ HdfsBlocksMetadata resp;
+ try {
+ // Construct the Lists to make the actual call
+ List<ExtendedBlock> blocks =
+ new ArrayList<ExtendedBlock>(request.getBlocksCount());
+ for (ExtendedBlockProto b : request.getBlocksList()) {
+ blocks.add(PBHelper.convert(b));
+ }
+ List<Token<BlockTokenIdentifier>> tokens =
+ new ArrayList<Token<BlockTokenIdentifier>>(request.getTokensCount());
+ for (BlockTokenIdentifierProto b : request.getTokensList()) {
+ tokens.add(PBHelper.convert(b));
+ }
+ // Call the real implementation
+ resp = impl.getHdfsBlocksMetadata(blocks, tokens);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ List<ByteString> volumeIdsByteStrings =
+ new ArrayList<ByteString>(resp.getVolumeIds().size());
+ for (byte[] b : resp.getVolumeIds()) {
+ volumeIdsByteStrings.add(ByteString.copyFrom(b));
+ }
+ // Build and return the response
+ Builder builder = GetHdfsBlockLocationsResponseProto.newBuilder();
+ builder.addAllVolumeIds(volumeIdsByteStrings);
+ builder.addAllVolumeIndexes(resp.getVolumeIndexes());
+ return builder.build();
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java Fri Aug 17 16:52:07 2012
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.protocolP
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
import javax.net.SocketFactory;
@@ -33,12 +35,17 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -50,6 +57,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@@ -207,4 +215,44 @@ public class ClientDatanodeProtocolTrans
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
-}
\ No newline at end of file
+
+ @Override
+ public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
+ List<Token<BlockTokenIdentifier>> tokens) throws IOException {
+ // Convert to proto objects
+ List<ExtendedBlockProto> blocksProtos =
+ new ArrayList<ExtendedBlockProto>(blocks.size());
+ List<BlockTokenIdentifierProto> tokensProtos =
+ new ArrayList<BlockTokenIdentifierProto>(tokens.size());
+ for (ExtendedBlock b : blocks) {
+ blocksProtos.add(PBHelper.convert(b));
+ }
+ for (Token<BlockTokenIdentifier> t : tokens) {
+ tokensProtos.add(PBHelper.convert(t));
+ }
+ // Build the request
+ GetHdfsBlockLocationsRequestProto request =
+ GetHdfsBlockLocationsRequestProto.newBuilder()
+ .addAllBlocks(blocksProtos)
+ .addAllTokens(tokensProtos)
+ .build();
+ // Send the RPC
+ GetHdfsBlockLocationsResponseProto response;
+ try {
+ response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ // List of volumes in the response
+ List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList();
+ List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size());
+ for (ByteString bs : volumeIdsByteStrings) {
+ volumeIds.add(bs.toByteArray());
+ }
+ // Array of indexes into the list of volumes, one per block
+ List<Integer> volumeIndexes = response.getVolumeIndexesList();
+ // Parsed HdfsVolumeId values, one per block
+ return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}),
+ volumeIds, volumeIndexes);
+ }
+}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Aug 17 16:52:07 2012
@@ -96,6 +96,7 @@ import org.apache.hadoop.hdfs.protocol.C
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
@@ -279,6 +280,7 @@ public class DataNode extends Configured
private final String userWithLocalPathAccess;
private boolean connectToDnViaHostname;
ReadaheadPool readaheadPool;
+ private final boolean getHdfsBlockLocationsEnabled;
/**
* Create the DataNode given a configuration and an array of dataDirs.
@@ -303,6 +305,9 @@ public class DataNode extends Configured
this.connectToDnViaHostname = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+ this.getHdfsBlockLocationsEnabled = conf.getBoolean(
+ DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+ DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
try {
hostName = getHostName(conf);
LOG.info("Configured hostname is " + hostName);
@@ -1033,6 +1038,25 @@ public class DataNode extends Configured
metrics.incrBlocksGetLocalPathInfo();
return info;
}
+
+ @Override
+ public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
+ List<Token<BlockTokenIdentifier>> tokens) throws IOException,
+ UnsupportedOperationException {
+ if (!getHdfsBlockLocationsEnabled) {
+ throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata "
+ + " is not enabled in datanode config");
+ }
+ if (blocks.size() != tokens.size()) {
+ throw new IOException("Differing number of blocks and tokens");
+ }
+ // Check access for each block
+ for (int i = 0; i < blocks.size(); i++) {
+ checkBlockToken(blocks.get(i), tokens.get(i),
+ BlockTokenSecretManager.AccessMode.READ);
+ }
+ return data.getHdfsBlocksMetadata(blocks);
+ }
private void checkBlockToken(ExtendedBlock block, Token<BlockTokenIdentifier> token,
AccessMode accessMode) throws IOException {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Fri Aug 17 16:52:07 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.Replica;
@@ -373,4 +374,16 @@ public interface FsDatasetSpi<V extends
*/
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b
) throws IOException;
+
+ /**
+ * Get a {@link HdfsBlocksMetadata} corresponding to the list of blocks in
+ * <code>blocks</code>.
+ *
+ * @param blocks List of blocks for which to return metadata
+ * @return metadata Metadata for the list of blocks
+ * @throws IOException
+ */
+ public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
+ throws IOException;
+
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Fri Aug 17 16:52:07 2012
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -1667,6 +1668,43 @@ class FsDatasetImpl implements FsDataset
datafile.getAbsolutePath(), metafile.getAbsolutePath());
return info;
}
+
+ @Override // FsDatasetSpi
+ public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
+ throws IOException {
+ // List of VolumeIds, one per volume on the datanode
+ List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(volumes.volumes.size());
+ // List of indexes into the list of VolumeIds, pointing at the VolumeId of
+ // the volume that the block is on
+ List<Integer> blocksVolumendexes = new ArrayList<Integer>(blocks.size());
+ // Initialize the list of VolumeIds simply by enumerating the volumes
+ for (int i = 0; i < volumes.volumes.size(); i++) {
+ blocksVolumeIds.add(new byte[] { (byte) i });
+ }
+ // Determine the index of the VolumeId of each block's volume, by comparing
+ // the block's volume against the enumerated volumes
+ for (int i = 0; i < blocks.size(); i++) {
+ ExtendedBlock block = blocks.get(i);
+ FsVolumeSpi blockVolume = getReplicaInfo(block).getVolume();
+ boolean isValid = false;
+ int volumeIndex = 0;
+ for (FsVolumeImpl volume : volumes.volumes) {
+ // This comparison of references should be safe
+ if (blockVolume == volume) {
+ isValid = true;
+ break;
+ }
+ volumeIndex++;
+ }
+ // Indicates that the block is not present, or not found in a data dir
+ if (!isValid) {
+ volumeIndex = Integer.MAX_VALUE;
+ }
+ blocksVolumendexes.add(volumeIndex);
+ }
+ return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}),
+ blocksVolumeIds, blocksVolumendexes);
+ }
@Override
public RollingLogs createRollingLogs(String bpid, String prefix
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto Fri Aug 17 16:52:07 2012
@@ -90,6 +90,26 @@ message GetBlockLocalPathInfoResponsePro
}
/**
+ * blocks - list of ExtendedBlocks on which we are querying additional info
+ * tokens - list of access tokens corresponding to list of ExtendedBlocks
+ */
+message GetHdfsBlockLocationsRequestProto {
+ repeated ExtendedBlockProto blocks = 1;
+ repeated BlockTokenIdentifierProto tokens = 2;
+}
+
+/**
+ * volumeIds - id of each volume, potentially multiple bytes
+ * volumeIndexes - for each block, an index into volumeIds specifying the volume
+ * on which it is located. If block is not present on any volume,
+ * index is set to MAX_INT.
+ */
+message GetHdfsBlockLocationsResponseProto {
+ repeated bytes volumeIds = 1;
+ repeated uint32 volumeIndexes = 2;
+}
+
+/**
* Protocol used from client to the Datanode.
* See the request and response for details of rpc call.
*/
@@ -119,4 +139,11 @@ service ClientDatanodeProtocolService {
*/
rpc getBlockLocalPathInfo(GetBlockLocalPathInfoRequestProto)
returns(GetBlockLocalPathInfoResponseProto);
+
+ /**
+ * Retrieve additional HDFS-specific metadata about a set of blocks stored
+ * on the local file system.
+ */
+ rpc getHdfsBlockLocations(GetHdfsBlockLocationsRequestProto)
+ returns(GetHdfsBlockLocationsResponseProto);
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Aug 17 16:52:07 2012
@@ -78,7 +78,7 @@
<property>
<name>dfs.datanode.handler.count</name>
- <value>3</value>
+ <value>10</value>
<description>The number of server threads for the datanode.</description>
</property>
@@ -1051,4 +1051,28 @@
</description>
</property>
+<property>
+ <name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
+ <value>false</value>
+ <description>
+ Boolean which enables backend datanode-side support for the experimental DistributedFileSystem#getFileVBlockStorageLocations API.
+ </description>
+</property>
+
+<property>
+ <name>dfs.client.file-block-storage-locations.num-threads</name>
+ <value>10</value>
+ <description>
+ Number of threads used for making parallel RPCs in DistributedFileSystem#getFileBlockStorageLocations().
+ </description>
+</property>
+
+<property>
+ <name>dfs.client.file-block-storage-locations.timeout</name>
+ <value>60</value>
+ <description>
+ Timeout (in seconds) for the parallel RPCs made in DistributedFileSystem#getFileBlockStorageLocations().
+ </description>
+</property>
+
</configuration>
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Fri Aug 17 16:52:07 2012
@@ -27,10 +27,14 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
import java.util.Random;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.BlockStorageLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -38,6 +42,7 @@ import org.apache.hadoop.fs.FileChecksum
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
@@ -570,4 +575,93 @@ public class TestDistributedFileSystem {
testDFSClient();
testFileChecksum();
}
+
+ /**
+ * Tests the normal path of batching up BlockLocation[]s to be passed to a
+ * single
+ * {@link DistributedFileSystem#getFileBlockStorageLocations(java.util.List)}
+ * call
+ */
+ @Test
+ public void testGetFileBlockStorageLocationsBatching() throws Exception {
+ final Configuration conf = getTestConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+ true);
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(2).build();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ // Create two files
+ Path tmpFile1 = new Path("/tmpfile1.dat");
+ Path tmpFile2 = new Path("/tmpfile2.dat");
+ DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl);
+ DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl);
+ // Get locations of blocks of both files and concat together
+ BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024);
+ BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024);
+ BlockLocation[] blockLocs = (BlockLocation[]) ArrayUtils.addAll(blockLocs1,
+ blockLocs2);
+ // Fetch VolumeBlockLocations in batch
+ BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays
+ .asList(blockLocs));
+ int counter = 0;
+ // Print out the list of ids received for each block
+ for (BlockStorageLocation l : locs) {
+ for (int i = 0; i < l.getVolumeIds().length; i++) {
+ VolumeId id = l.getVolumeIds()[i];
+ String name = l.getNames()[i];
+ if (id != null) {
+ System.out.println("Datanode " + name + " has block " + counter
+ + " on volume id " + id.toString());
+ }
+ }
+ counter++;
+ }
+ assertEquals("Expected two HdfsBlockLocations for two 1-block files", 2,
+ locs.length);
+ for (BlockStorageLocation l : locs) {
+ assertEquals("Expected two replicas for each block", 2,
+ l.getVolumeIds().length);
+ for (int i = 0; i < l.getVolumeIds().length; i++) {
+ VolumeId id = l.getVolumeIds()[i];
+ String name = l.getNames()[i];
+ assertTrue("Expected block to be valid on datanode " + name,
+ id.isValid());
+ }
+ }
+ }
+
+ /**
+ * Tests error paths for
+ * {@link DistributedFileSystem#getFileBlockStorageLocations(java.util.List)}
+ */
+ @Test
+ public void testGetFileBlockStorageLocationsError() throws Exception {
+ final Configuration conf = getTestConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+ true);
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(2).build();
+ cluster.getDataNodes();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ // Create a file
+ Path tmpFile = new Path("/tmpfile1.dat");
+ DFSTestUtil.createFile(fs, tmpFile, 1024, (short) 2, 0xDEADDEADl);
+ // Get locations of blocks of the file
+ BlockLocation[] blockLocs = fs.getFileBlockLocations(tmpFile, 0, 1024);
+ // Stop a datanode to simulate a failure
+ cluster.stopDataNode(0);
+ // Fetch VolumeBlockLocations
+ BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays
+ .asList(blockLocs));
+
+ assertEquals("Expected one HdfsBlockLocation for one 1-block file", 1,
+ locs.length);
+
+ for (BlockStorageLocation l : locs) {
+ assertEquals("Expected two replicas for each block", 2,
+ l.getVolumeIds().length);
+ assertTrue("Expected one valid and one invalid replica",
+ (l.getVolumeIds()[0].isValid()) ^ (l.getVolumeIds()[1].isValid()));
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1374355&r1=1374354&r2=1374355&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Fri Aug 17 16:52:07 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -961,6 +962,12 @@ public class SimulatedFSDataset implemen
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
@Override
public String[] getBlockPoolList() {