You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/03/16 21:18:55 UTC
[48/50] [abbrv] hadoop git commit: HDFS-7853. Erasure coding: extend
LocatedBlocks to support reading from striped files. Contributed by Jing
Zhao.
HDFS-7853. Erasure coding: extend LocatedBlocks to support reading from striped files. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fc774e7e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fc774e7e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fc774e7e
Branch: refs/heads/HDFS-7285
Commit: fc774e7e7410dc9a687f87f5053e03c0c3aeee44
Parents: ca332bf
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Mar 9 14:59:58 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Mar 16 13:13:45 2015 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/protocol/LocatedBlock.java | 5 +-
.../hdfs/protocol/LocatedStripedBlock.java | 68 +++++++++
...tNamenodeProtocolServerSideTranslatorPB.java | 14 +-
.../ClientNamenodeProtocolTranslatorPB.java | 13 +-
.../DatanodeProtocolClientSideTranslatorPB.java | 2 +-
.../DatanodeProtocolServerSideTranslatorPB.java | 2 +-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 80 +++++++----
.../blockmanagement/BlockInfoStriped.java | 5 +
.../BlockInfoStripedUnderConstruction.java | 99 +++++++------
.../server/blockmanagement/BlockManager.java | 51 ++++---
.../blockmanagement/DatanodeDescriptor.java | 4 +-
.../blockmanagement/DatanodeStorageInfo.java | 3 +-
.../server/namenode/FSImageFormatPBINode.java | 21 +--
.../hdfs/server/namenode/FSNamesystem.java | 34 +++--
.../hadoop-hdfs/src/main/proto/hdfs.proto | 1 +
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 13 ++
.../hadoop/hdfs/protocolPB/TestPBHelper.java | 16 +--
.../datanode/TestIncrementalBrVariations.java | 14 +-
.../server/namenode/TestAddStripedBlocks.java | 141 +++++++++++++++++++
.../hdfs/server/namenode/TestFSImage.java | 5 +-
20 files changed, 445 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
index 0d52191..97dc534 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.security.token.Token;
import com.google.common.collect.Lists;
@@ -51,14 +50,14 @@ public class LocatedBlock {
// else false. If block has few corrupt replicas, they are filtered and
// their locations are not part of this object
private boolean corrupt;
- private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
+ private Token<BlockTokenIdentifier> blockToken = new Token<>();
/**
* List of cached datanode locations
*/
private DatanodeInfo[] cachedLocs;
// Used when there are no locations
- private static final DatanodeInfoWithStorage[] EMPTY_LOCS =
+ static final DatanodeInfoWithStorage[] EMPTY_LOCS =
new DatanodeInfoWithStorage[0];
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
new file mode 100644
index 0000000..97e3a69
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+
+import java.util.Arrays;
+
+/**
+ * {@link LocatedBlock} with striped block support. For a striped block, each
+ * datanode storage is associated with a block in the block group. We need to
+ * record the index (in the striped block group) for each of them.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class LocatedStripedBlock extends LocatedBlock {
+ private int[] blockIndices;
+
+ public LocatedStripedBlock(ExtendedBlock b, DatanodeInfo[] locs,
+ String[] storageIDs, StorageType[] storageTypes, int[] indices,
+ long startOffset, boolean corrupt, DatanodeInfo[] cachedLocs) {
+ super(b, locs, storageIDs, storageTypes, startOffset, corrupt, cachedLocs);
+ assert indices != null && indices.length == locs.length;
+ this.blockIndices = new int[indices.length];
+ System.arraycopy(indices, 0, blockIndices, 0, indices.length);
+ }
+
+ public LocatedStripedBlock(ExtendedBlock b, DatanodeStorageInfo[] storages,
+ int[] indices, long startOffset, boolean corrupt) {
+ this(b, DatanodeStorageInfo.toDatanodeInfos(storages),
+ DatanodeStorageInfo.toStorageIDs(storages),
+ DatanodeStorageInfo.toStorageTypes(storages), indices,
+ startOffset, corrupt, EMPTY_LOCS);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "{" + getBlock()
+ + "; getBlockSize()=" + getBlockSize()
+ + "; corrupt=" + isCorrupt()
+ + "; offset=" + getStartOffset()
+ + "; locs=" + Arrays.asList(getLocations())
+ + "; indices=" + Arrays.asList(blockIndices)
+ + "}";
+ }
+
+ public int[] getBlockIndices() {
+ return this.blockIndices;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index ce8c392..16da085 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -422,7 +422,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
req.getClientName(), flags);
AppendResponseProto.Builder builder = AppendResponseProto.newBuilder();
if (result.getLastBlock() != null) {
- builder.setBlock(PBHelper.convert(result.getLastBlock()));
+ builder.setBlock(PBHelper.convertLocatedBlock(result.getLastBlock()));
}
if (result.getFileStatus() != null) {
builder.setStat(PBHelper.convert(result.getFileStatus()));
@@ -498,7 +498,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
(favor == null || favor.size() == 0) ? null : favor
.toArray(new String[favor.size()]));
return AddBlockResponseProto.newBuilder()
- .setBlock(PBHelper.convert(result)).build();
+ .setBlock(PBHelper.convertLocatedBlock(result)).build();
} catch (IOException e) {
throw new ServiceException(e);
}
@@ -522,7 +522,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
new DatanodeInfoProto[excludesList.size()])),
req.getNumAdditionalNodes(), req.getClientName());
return GetAdditionalDatanodeResponseProto.newBuilder().setBlock(
- PBHelper.convert(result))
+ PBHelper.convertLocatedBlock(result))
.build();
} catch (IOException e) {
throw new ServiceException(e);
@@ -548,8 +548,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
ReportBadBlocksRequestProto req) throws ServiceException {
try {
List<LocatedBlockProto> bl = req.getBlocksList();
- server.reportBadBlocks(PBHelper.convertLocatedBlock(
- bl.toArray(new LocatedBlockProto[bl.size()])));
+ server.reportBadBlocks(PBHelper.convertLocatedBlocks(
+ bl.toArray(new LocatedBlockProto[bl.size()])));
} catch (IOException e) {
throw new ServiceException(e);
}
@@ -952,8 +952,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
RpcController controller, UpdateBlockForPipelineRequestProto req)
throws ServiceException {
try {
- LocatedBlockProto result = PBHelper.convert(server
- .updateBlockForPipeline(PBHelper.convert(req.getBlock()),
+ LocatedBlockProto result = PBHelper.convertLocatedBlock(
+ server.updateBlockForPipeline(PBHelper.convert(req.getBlock()),
req.getClientName()));
return UpdateBlockForPipelineResponseProto.newBuilder().setBlock(result)
.build();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index e970293..b10a6ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -327,7 +327,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
try {
AppendResponseProto res = rpcProxy.append(null, req);
LocatedBlock lastBlock = res.hasBlock() ? PBHelper
- .convert(res.getBlock()) : null;
+ .convertLocatedBlockProto(res.getBlock()) : null;
HdfsFileStatus stat = (res.hasStat()) ? PBHelper.convert(res.getStat())
: null;
return new LastBlockWithStatus(lastBlock, stat);
@@ -415,7 +415,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
req.addAllFavoredNodes(Arrays.asList(favoredNodes));
}
try {
- return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock());
+ return PBHelper.convertLocatedBlockProto(
+ rpcProxy.addBlock(null, req.build()).getBlock());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@@ -440,8 +441,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setClientName(clientName)
.build();
try {
- return PBHelper.convert(rpcProxy.getAdditionalDatanode(null, req)
- .getBlock());
+ return PBHelper.convertLocatedBlockProto(
+ rpcProxy.getAdditionalDatanode(null, req).getBlock());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@@ -468,7 +469,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
@Override
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
ReportBadBlocksRequestProto req = ReportBadBlocksRequestProto.newBuilder()
- .addAllBlocks(Arrays.asList(PBHelper.convertLocatedBlock(blocks)))
+ .addAllBlocks(Arrays.asList(PBHelper.convertLocatedBlocks(blocks)))
.build();
try {
rpcProxy.reportBadBlocks(null, req);
@@ -898,7 +899,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setClientName(clientName)
.build();
try {
- return PBHelper.convert(
+ return PBHelper.convertLocatedBlockProto(
rpcProxy.updateBlockForPipeline(null, req).getBlock());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index c4003f1..19a8f22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -276,7 +276,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
ReportBadBlocksRequestProto.Builder builder = ReportBadBlocksRequestProto
.newBuilder();
for (int i = 0; i < blocks.length; i++) {
- builder.addBlocks(i, PBHelper.convert(blocks[i]));
+ builder.addBlocks(i, PBHelper.convertLocatedBlock(blocks[i]));
}
ReportBadBlocksRequestProto req = builder.build();
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index e18081f..dea476c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -256,7 +256,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
List<LocatedBlockProto> lbps = request.getBlocksList();
LocatedBlock [] blocks = new LocatedBlock [lbps.size()];
for(int i=0; i<lbps.size(); i++) {
- blocks[i] = PBHelper.convert(lbps.get(i));
+ blocks[i] = PBHelper.convertLocatedBlockProto(lbps.get(i));
}
try {
impl.reportBadBlocks(blocks);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index eaa26bc..ad82ccd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -80,6 +80,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@@ -625,7 +626,7 @@ public class PBHelper {
if (b == null) {
return null;
}
- LocatedBlockProto lb = PBHelper.convert((LocatedBlock)b);
+ LocatedBlockProto lb = PBHelper.convertLocatedBlock(b);
RecoveringBlockProto.Builder builder = RecoveringBlockProto.newBuilder();
builder.setBlock(lb).setNewGenStamp(b.getNewGenerationStamp());
if(b.getNewBlock() != null)
@@ -774,7 +775,7 @@ public class PBHelper {
}
}
- public static LocatedBlockProto convert(LocatedBlock b) {
+ public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) {
if (b == null) return null;
Builder builder = LocatedBlockProto.newBuilder();
DatanodeInfo[] locs = b.getLocations();
@@ -795,21 +796,27 @@ public class PBHelper {
StorageType[] storageTypes = b.getStorageTypes();
if (storageTypes != null) {
- for (int i = 0; i < storageTypes.length; ++i) {
- builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i]));
+ for (StorageType storageType : storageTypes) {
+ builder.addStorageTypes(PBHelper.convertStorageType(storageType));
}
}
final String[] storageIDs = b.getStorageIDs();
if (storageIDs != null) {
builder.addAllStorageIDs(Arrays.asList(storageIDs));
}
+ if (b instanceof LocatedStripedBlock) {
+ int[] indices = ((LocatedStripedBlock) b).getBlockIndices();
+ for (int index : indices) {
+ builder.addBlockIndex(index);
+ }
+ }
return builder.setB(PBHelper.convert(b.getBlock()))
.setBlockToken(PBHelper.convert(b.getBlockToken()))
.setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
}
- public static LocatedBlock convert(LocatedBlockProto proto) {
+ public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) {
if (proto == null) return null;
List<DatanodeInfoProto> locs = proto.getLocsList();
DatanodeInfo[] targets = new DatanodeInfo[locs.size()];
@@ -829,6 +836,15 @@ public class PBHelper {
storageIDs = proto.getStorageIDsList().toArray(new String[storageIDsCount]);
}
+ int[] indices = null;
+ final int indexCount = proto.getBlockIndexCount();
+ if (indexCount > 0) {
+ indices = new int[indexCount];
+ for (int i = 0; i < indexCount; i++) {
+ indices[i] = proto.getBlockIndex(i);
+ }
+ }
+
// Set values from the isCached list, re-using references from loc
List<DatanodeInfo> cachedLocs = new ArrayList<DatanodeInfo>(locs.size());
List<Boolean> isCachedList = proto.getIsCachedList();
@@ -838,9 +854,17 @@ public class PBHelper {
}
}
- LocatedBlock lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets,
- storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(),
- cachedLocs.toArray(new DatanodeInfo[0]));
+ final LocatedBlock lb;
+ if (indices == null) {
+ lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets, storageIDs,
+ storageTypes, proto.getOffset(), proto.getCorrupt(),
+ cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
+ } else {
+ lb = new LocatedStripedBlock(PBHelper.convert(proto.getB()), targets,
+ storageIDs, storageTypes, indices, proto.getOffset(),
+ proto.getCorrupt(),
+ cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
+ }
lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
return lb;
@@ -1256,36 +1280,36 @@ public class PBHelper {
}
// Located Block Arrays and Lists
- public static LocatedBlockProto[] convertLocatedBlock(LocatedBlock[] lb) {
+ public static LocatedBlockProto[] convertLocatedBlocks(LocatedBlock[] lb) {
if (lb == null) return null;
- return convertLocatedBlock2(Arrays.asList(lb)).toArray(
- new LocatedBlockProto[lb.length]);
+ return convertLocatedBlocks2(Arrays.asList(lb))
+ .toArray(new LocatedBlockProto[lb.length]);
}
- public static LocatedBlock[] convertLocatedBlock(LocatedBlockProto[] lb) {
+ public static LocatedBlock[] convertLocatedBlocks(LocatedBlockProto[] lb) {
if (lb == null) return null;
- return convertLocatedBlock(Arrays.asList(lb)).toArray(
- new LocatedBlock[lb.length]);
+ return convertLocatedBlocks(Arrays.asList(lb))
+ .toArray(new LocatedBlock[lb.length]);
}
- public static List<LocatedBlock> convertLocatedBlock(
+ public static List<LocatedBlock> convertLocatedBlocks(
List<LocatedBlockProto> lb) {
if (lb == null) return null;
final int len = lb.size();
- List<LocatedBlock> result =
- new ArrayList<LocatedBlock>(len);
- for (int i = 0; i < len; ++i) {
- result.add(PBHelper.convert(lb.get(i)));
+ List<LocatedBlock> result = new ArrayList<>(len);
+ for (LocatedBlockProto aLb : lb) {
+ result.add(PBHelper.convertLocatedBlockProto(aLb));
}
return result;
}
- public static List<LocatedBlockProto> convertLocatedBlock2(List<LocatedBlock> lb) {
+ public static List<LocatedBlockProto> convertLocatedBlocks2(
+ List<LocatedBlock> lb) {
if (lb == null) return null;
final int len = lb.size();
- List<LocatedBlockProto> result = new ArrayList<LocatedBlockProto>(len);
- for (int i = 0; i < len; ++i) {
- result.add(PBHelper.convert(lb.get(i)));
+ List<LocatedBlockProto> result = new ArrayList<>(len);
+ for (LocatedBlock aLb : lb) {
+ result.add(PBHelper.convertLocatedBlock(aLb));
}
return result;
}
@@ -1295,8 +1319,9 @@ public class PBHelper {
public static LocatedBlocks convert(LocatedBlocksProto lb) {
return new LocatedBlocks(
lb.getFileLength(), lb.getUnderConstruction(),
- PBHelper.convertLocatedBlock(lb.getBlocksList()),
- lb.hasLastBlock() ? PBHelper.convert(lb.getLastBlock()) : null,
+ PBHelper.convertLocatedBlocks(lb.getBlocksList()),
+ lb.hasLastBlock() ?
+ PBHelper.convertLocatedBlockProto(lb.getLastBlock()) : null,
lb.getIsLastBlockComplete(),
lb.hasFileEncryptionInfo() ? convert(lb.getFileEncryptionInfo()) :
null);
@@ -1309,14 +1334,15 @@ public class PBHelper {
LocatedBlocksProto.Builder builder =
LocatedBlocksProto.newBuilder();
if (lb.getLastLocatedBlock() != null) {
- builder.setLastBlock(PBHelper.convert(lb.getLastLocatedBlock()));
+ builder.setLastBlock(
+ PBHelper.convertLocatedBlock(lb.getLastLocatedBlock()));
}
if (lb.getFileEncryptionInfo() != null) {
builder.setFileEncryptionInfo(convert(lb.getFileEncryptionInfo()));
}
return builder.setFileLength(lb.getFileLength())
.setUnderConstruction(lb.isUnderConstruction())
- .addAllBlocks(PBHelper.convertLocatedBlock2(lb.getLocatedBlocks()))
+ .addAllBlocks(PBHelper.convertLocatedBlocks2(lb.getLocatedBlocks()))
.setIsLastBlockComplete(lb.isLastBlockComplete()).build();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 8b458df..84c3be6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@ -123,6 +123,11 @@ public class BlockInfoStriped extends BlockInfo {
return -1;
}
+ int getStorageBlockIndex(DatanodeStorageInfo storage) {
+ int i = this.findStorageInfo(storage);
+ return i == -1 ? -1 : indices[i];
+ }
+
@Override
boolean removeStorage(DatanodeStorageInfo storage) {
int dnIndex = findStorageInfoFromEnd(storage);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
index 151241b2..b1857bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
@@ -23,9 +23,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.COMPLETE;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION;
@@ -39,12 +36,8 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped {
/**
* Block replicas as assigned when the block was allocated.
- *
- * TODO: we need to update this attribute, along with the return type of
- * getExpectedStorageLocations and LocatedBlock. For striped blocks, clients
- * need to understand the index of each striped block in the block group.
*/
- private List<ReplicaUnderConstruction> replicas;
+ private ReplicaUnderConstruction[] replicas;
/**
* The new generation stamp, which this block will have
@@ -75,12 +68,12 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped {
/**
* Convert an under construction striped block to a complete striped block.
- *
+ *
* @return BlockInfoStriped - a complete block.
- * @throws IOException if the state of the block
- * (the generation stamp and the length) has not been committed by
- * the client or it does not have at least a minimal number of replicas
- * reported from data-nodes.
+ * @throws IOException if the state of the block
+ * (the generation stamp and the length) has not been committed by
+ * the client or it does not have at least a minimal number of replicas
+ * reported from data-nodes.
*/
BlockInfoStriped convertToCompleteBlock() throws IOException {
assert getBlockUCState() != COMPLETE :
@@ -91,10 +84,13 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped {
/** Set expected locations */
public void setExpectedLocations(DatanodeStorageInfo[] targets) {
int numLocations = targets == null ? 0 : targets.length;
- this.replicas = new ArrayList<>(numLocations);
+ this.replicas = new ReplicaUnderConstruction[numLocations];
for(int i = 0; i < numLocations; i++) {
- replicas.add(new ReplicaUnderConstruction(this, targets[i],
- ReplicaState.RBW));
+ // when creating a new block we simply sequentially assign block index to
+ // each storage
+ Block blk = new Block(this.getBlockId() + i, this.getGenerationStamp(), 0);
+ replicas[i] = new ReplicaUnderConstruction(blk, targets[i],
+ ReplicaState.RBW);
}
}
@@ -106,14 +102,24 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped {
int numLocations = getNumExpectedLocations();
DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
for (int i = 0; i < numLocations; i++) {
- storages[i] = replicas.get(i).getExpectedStorageLocation();
+ storages[i] = replicas[i].getExpectedStorageLocation();
}
return storages;
}
+ /** @return the index array indicating the block index in each storage */
+ public int[] getBlockIndices() {
+ int numLocations = getNumExpectedLocations();
+ int[] indices = new int[numLocations];
+ for (int i = 0; i < numLocations; i++) {
+ indices[i] = BlockIdManager.getBlockIndex(replicas[i]);
+ }
+ return indices;
+ }
+
/** Get the number of expected locations */
public int getNumExpectedLocations() {
- return replicas == null ? 0 : replicas.size();
+ return replicas == null ? 0 : replicas.length;
}
/**
@@ -178,7 +184,7 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped {
public void initializeBlockRecovery(long recoveryId) {
setBlockUCState(BlockUCState.UNDER_RECOVERY);
blockRecoveryId = recoveryId;
- if (replicas == null || replicas.size() == 0) {
+ if (replicas == null || replicas.length == 0) {
NameNode.blockStateChangeLog.warn("BLOCK*" +
" BlockInfoUnderConstruction.initLeaseRecovery:" +
" No blocks found, lease removed.");
@@ -186,28 +192,36 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped {
// TODO we need to implement different recovery logic here
}
- void addReplicaIfNotPresent(DatanodeStorageInfo storage, Block block,
+ void addReplicaIfNotPresent(DatanodeStorageInfo storage, Block reportedBlock,
ReplicaState rState) {
- Iterator<ReplicaUnderConstruction> it = replicas.iterator();
- while (it.hasNext()) {
- ReplicaUnderConstruction r = it.next();
- DatanodeStorageInfo expectedLocation = r.getExpectedStorageLocation();
- if (expectedLocation == storage) {
- // Record the gen stamp from the report
- r.setGenerationStamp(block.getGenerationStamp());
- return;
- } else if (expectedLocation != null &&
- expectedLocation.getDatanodeDescriptor() ==
- storage.getDatanodeDescriptor()) {
- // The Datanode reported that the block is on a different storage
- // than the one chosen by BlockPlacementPolicy. This can occur as
- // we allow Datanodes to choose the target storage. Update our
- // state by removing the stale entry and adding a new one.
- it.remove();
- break;
+ if (replicas == null) {
+ replicas = new ReplicaUnderConstruction[1];
+ replicas[0] = new ReplicaUnderConstruction(reportedBlock, storage, rState);
+ } else {
+ for (int i = 0; i < replicas.length; i++) {
+ DatanodeStorageInfo expected = replicas[i].getExpectedStorageLocation();
+ if (expected == storage) {
+ replicas[i].setBlockId(reportedBlock.getBlockId());
+ replicas[i].setGenerationStamp(reportedBlock.getGenerationStamp());
+ return;
+ } else if (expected != null && expected.getDatanodeDescriptor() ==
+ storage.getDatanodeDescriptor()) {
+ // The Datanode reported that the block is on a different storage
+ // than the one chosen by BlockPlacementPolicy. This can occur as
+ // we allow Datanodes to choose the target storage. Update our
+ // state by removing the stale entry and adding a new one.
+ replicas[i] = new ReplicaUnderConstruction(reportedBlock, storage,
+ rState);
+ return;
+ }
}
+ ReplicaUnderConstruction[] newReplicas =
+ new ReplicaUnderConstruction[replicas.length + 1];
+ System.arraycopy(replicas, 0, newReplicas, 0, replicas.length);
+ newReplicas[newReplicas.length - 1] = new ReplicaUnderConstruction(
+ reportedBlock, storage, rState);
+ replicas = newReplicas;
}
- replicas.add(new ReplicaUnderConstruction(block, storage, rState));
}
@Override
@@ -226,12 +240,11 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped {
private void appendUCParts(StringBuilder sb) {
sb.append("{UCState=").append(blockUCState).append(", replicas=[");
if (replicas != null) {
- Iterator<ReplicaUnderConstruction> iter = replicas.iterator();
- if (iter.hasNext()) {
- iter.next().appendStringTo(sb);
- while (iter.hasNext()) {
+ int i = 0;
+ for (ReplicaUnderConstruction r : replicas) {
+ r.appendStringTo(sb);
+ if (++i < replicas.length) {
sb.append(", ");
- iter.next().appendStringTo(sb);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 7f6a2ff..f22e9f4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
@@ -838,21 +839,26 @@ public class BlockManager {
}
/** @return a LocatedBlock for the given block */
- private LocatedBlock createLocatedBlock(final BlockInfo blk,
- final long pos) throws IOException {
- if (blk instanceof BlockInfoContiguousUnderConstruction) {
- if (blk.isComplete()) {
- throw new IOException(
- "blk instanceof BlockInfoUnderConstruction && blk.isComplete()"
- + ", blk=" + blk);
+ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos) {
+ if (!blk.isComplete()) {
+ if (blk.isStriped()) {
+ final BlockInfoStripedUnderConstruction uc =
+ (BlockInfoStripedUnderConstruction) blk;
+ final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
+ final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(),
+ blk);
+ return new LocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos,
+ false);
+ } else {
+ assert blk instanceof BlockInfoContiguousUnderConstruction;
+ final BlockInfoContiguousUnderConstruction uc =
+ (BlockInfoContiguousUnderConstruction) blk;
+ final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
+ final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(),
+ blk);
+ return new LocatedBlock(eb, storages, pos, false);
}
- final BlockInfoContiguousUnderConstruction uc =
- (BlockInfoContiguousUnderConstruction) blk;
- final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
- final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
- return new LocatedBlock(eb, storages, pos, false);
}
- // TODO support BlockInfoStripedUC
// get block locations
final int numCorruptNodes = countNodes(blk).corruptReplicas();
@@ -867,13 +873,21 @@ public class BlockManager {
final boolean isCorrupt = numCorruptNodes == numNodes;
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
- int j = 0;
+ final int[] blockIndices = blk.isStriped() ? new int[numMachines] : null;
+ int j = 0, i = 0;
if (numMachines > 0) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
final DatanodeDescriptor d = storage.getDatanodeDescriptor();
final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
- if (isCorrupt || (!replicaCorrupt))
+ if (isCorrupt || (!replicaCorrupt)) {
machines[j++] = storage;
+ // TODO this can be more efficient
+ if (blockIndices != null) {
+ int index = ((BlockInfoStriped) blk).getStorageBlockIndex(storage);
+ assert index >= 0;
+ blockIndices[i++] = index;
+ }
+ }
}
}
assert j == machines.length :
@@ -883,7 +897,9 @@ public class BlockManager {
" numCorrupt: " + numCorruptNodes +
" numCorruptRepls: " + numCorruptReplicas;
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
- return new LocatedBlock(eb, machines, pos, isCorrupt);
+ return blockIndices == null ?
+ new LocatedBlock(eb, machines, pos, isCorrupt) :
+ new LocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt);
}
/** Create a LocatedBlocks. */
@@ -2383,7 +2399,8 @@ public class BlockManager {
void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
DatanodeStorageInfo storageInfo) throws IOException {
BlockInfo block = ucBlock.storedBlock;
- BlockInfo.addReplica(block, storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
+ BlockInfo.addReplica(block, storageInfo, ucBlock.reportedBlock,
+ ucBlock.reportedState);
if (ucBlock.reportedState == ReplicaState.FINALIZED &&
!block.findDatanode(storageInfo.getDatanodeDescriptor())) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index a54d46b..86f4158 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -255,7 +255,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
return storageMap.get(storageID);
}
}
- DatanodeStorageInfo[] getStorageInfos() {
+
+ @VisibleForTesting
+ public DatanodeStorageInfo[] getStorageInfos() {
synchronized (storageMap) {
final Collection<DatanodeStorageInfo> storages = storageMap.values();
return storages.toArray(new DatanodeStorageInfo[storages.size()]);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 3a5e66e..eb50830 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -192,7 +192,8 @@ public class DatanodeStorageInfo {
return getState() == State.FAILED && numBlocks != 0;
}
- String getStorageID() {
+ @VisibleForTesting
+ public String getStorageID() {
return storageID;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index 5627788..f293481 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@ -364,16 +364,19 @@ public final class FSImageFormatPBINode {
INodeSection.FileUnderConstructionFeature uc = f.getFileUC();
file.toUnderConstruction(uc.getClientName(), uc.getClientMachine());
BlockInfo lastBlk = file.getLastBlock();
- // replace the last block of file
- final BlockInfo ucBlk;
- if (stripeFeature != null) {
- BlockInfoStriped striped = (BlockInfoStriped) lastBlk;
- ucBlk = new BlockInfoStripedUnderConstruction(striped,
- striped.getDataBlockNum(), striped.getParityBlockNum());
- } else {
- ucBlk = new BlockInfoContiguousUnderConstruction(lastBlk, replication);
+ if (lastBlk != null) {
+ // replace the last block of file
+ final BlockInfo ucBlk;
+ if (stripeFeature != null) {
+ BlockInfoStriped striped = (BlockInfoStriped) lastBlk;
+ ucBlk = new BlockInfoStripedUnderConstruction(striped,
+ striped.getDataBlockNum(), striped.getParityBlockNum());
+ } else {
+ ucBlk = new BlockInfoContiguousUnderConstruction(lastBlk,
+ replication);
+ }
+ file.setBlock(file.numBlocks() - 1, ucBlk);
}
- file.setBlock(file.numBlocks() - 1, ucBlk);
}
return file;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index fa18e1c..667ab74 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -187,6 +187,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
@@ -205,6 +206,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@@ -1732,8 +1734,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
LocatedBlocks blocks = res.blocks;
if (blocks != null) {
+ List<LocatedBlock> blkList = blocks.getLocatedBlocks();
+ if (blkList == null || blkList.size() == 0 ||
+ blkList.get(0) instanceof LocatedStripedBlock) {
+ // no need to sort locations for striped blocks
+ return blocks;
+ }
blockManager.getDatanodeManager().sortLocatedBlocks(
- clientMachine, blocks.getLocatedBlocks());
+ clientMachine, blkList);
// lastBlock is not part of getLocatedBlocks(), might need to sort it too
LocatedBlock lastBlock = blocks.getLastLocatedBlock();
@@ -3012,7 +3020,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// Part II.
// Allocate a new block, add it to the INode and the BlocksMap.
- Block newBlock = null;
+ BlockInfo newBlockInfo = null;
long offset;
checkOperation(OperationCategory.WRITE);
waitForLoadingFSImage();
@@ -3045,8 +3053,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
ExtendedBlock.getLocalBlock(previous));
// allocate new block, record block locations in INode.
- newBlock = createNewBlock(isStriped);
- saveAllocatedBlock(src, fileState.iip, newBlock, targets, isStriped);
+ Block newBlock = createNewBlock(isStriped);
+ newBlockInfo = saveAllocatedBlock(src, fileState.iip, newBlock, targets,
+ isStriped);
persistNewBlock(src, pendingFile);
offset = pendingFile.computeFileSize();
@@ -3056,7 +3065,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
getEditLog().logSync();
// Return located block
- return makeLocatedBlock(newBlock, targets, offset);
+ return makeLocatedBlock(newBlockInfo, targets, offset);
}
/*
@@ -3195,10 +3204,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return new FileState(pendingFile, src, iip);
}
- LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
- long offset) throws IOException {
- LocatedBlock lBlk = new LocatedBlock(
- getExtendedBlock(blk), locs, offset, false);
+ LocatedBlock makeLocatedBlock(BlockInfo blk, DatanodeStorageInfo[] locs,
+ long offset) throws IOException {
+ final LocatedBlock lBlk;
+ if (blk.isStriped()) {
+ assert blk instanceof BlockInfoStripedUnderConstruction;
+ lBlk = new LocatedStripedBlock(getExtendedBlock(blk), locs,
+ ((BlockInfoStripedUnderConstruction) blk).getBlockIndices(),
+ offset, false);
+ } else {
+ lBlk = new LocatedBlock(getExtendedBlock(blk), locs, offset, false);
+ }
getBlockManager().setBlockToken(
lBlk, BlockTokenSecretManager.AccessMode.WRITE);
return lBlk;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
index 1fa3743..d5f636a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
@@ -205,6 +205,7 @@ message LocatedBlockProto {
repeated bool isCached = 6 [packed=true]; // if a location in locs is cached
repeated StorageTypeProto storageTypes = 7;
repeated string storageIDs = 8;
+ repeated uint32 blockIndex = 9; // used for striped block to indicate block index for each storage
}
message DataEncryptionKeyProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index c3dac35..3dc4d5e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -79,6 +79,10 @@ import org.apache.hadoop.hdfs.server.namenode.ha
.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
@@ -1711,4 +1715,13 @@ public class DFSTestUtil {
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
}
+
+ public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
+ Block block, BlockStatus blockStatus, DatanodeStorage storage) {
+ ReceivedDeletedBlockInfo[] receivedBlocks = new ReceivedDeletedBlockInfo[1];
+ receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, blockStatus, null);
+ StorageReceivedDeletedBlocks[] reports = new StorageReceivedDeletedBlocks[1];
+ reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
+ return reports;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
index 0236288..19d254c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
@@ -488,16 +488,16 @@ public class TestPBHelper {
@Test
public void testConvertLocatedBlock() {
LocatedBlock lb = createLocatedBlock();
- LocatedBlockProto lbProto = PBHelper.convert(lb);
- LocatedBlock lb2 = PBHelper.convert(lbProto);
+ LocatedBlockProto lbProto = PBHelper.convertLocatedBlock(lb);
+ LocatedBlock lb2 = PBHelper.convertLocatedBlockProto(lbProto);
compare(lb,lb2);
}
@Test
public void testConvertLocatedBlockNoStorageMedia() {
LocatedBlock lb = createLocatedBlockNoStorageMedia();
- LocatedBlockProto lbProto = PBHelper.convert(lb);
- LocatedBlock lb2 = PBHelper.convert(lbProto);
+ LocatedBlockProto lbProto = PBHelper.convertLocatedBlock(lb);
+ LocatedBlock lb2 = PBHelper.convertLocatedBlockProto(lbProto);
compare(lb,lb2);
}
@@ -507,8 +507,8 @@ public class TestPBHelper {
for (int i=0;i<3;i++) {
lbl.add(createLocatedBlock());
}
- List<LocatedBlockProto> lbpl = PBHelper.convertLocatedBlock2(lbl);
- List<LocatedBlock> lbl2 = PBHelper.convertLocatedBlock(lbpl);
+ List<LocatedBlockProto> lbpl = PBHelper.convertLocatedBlocks2(lbl);
+ List<LocatedBlock> lbl2 = PBHelper.convertLocatedBlocks(lbpl);
assertEquals(lbl.size(), lbl2.size());
for (int i=0;i<lbl.size();i++) {
compare(lbl.get(i), lbl2.get(2));
@@ -521,8 +521,8 @@ public class TestPBHelper {
for (int i=0;i<3;i++) {
lbl[i] = createLocatedBlock();
}
- LocatedBlockProto [] lbpl = PBHelper.convertLocatedBlock(lbl);
- LocatedBlock [] lbl2 = PBHelper.convertLocatedBlock(lbpl);
+ LocatedBlockProto [] lbpl = PBHelper.convertLocatedBlocks(lbl);
+ LocatedBlock [] lbl2 = PBHelper.convertLocatedBlocks(lbpl);
assertEquals(lbl.length, lbl2.length);
for (int i=0;i<lbl.length;i++) {
compare(lbl[i], lbl2[i]);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
index 4e73e6e..5d8d307 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
@@ -220,15 +220,6 @@ public class TestIncrementalBrVariations {
return new Block(10000000L, 100L, 1048576L);
}
- private static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
- Block block, DatanodeStorage storage) {
- ReceivedDeletedBlockInfo[] receivedBlocks = new ReceivedDeletedBlockInfo[1];
- receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, BlockStatus.RECEIVED_BLOCK, null);
- StorageReceivedDeletedBlocks[] reports = new StorageReceivedDeletedBlocks[1];
- reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
- return reports;
- }
-
/**
* Verify that the NameNode can learn about new storages from incremental
* block reports.
@@ -244,8 +235,9 @@ public class TestIncrementalBrVariations {
// Generate a report for a fake block on a fake storage.
final String newStorageUuid = UUID.randomUUID().toString();
final DatanodeStorage newStorage = new DatanodeStorage(newStorageUuid);
- StorageReceivedDeletedBlocks[] reports = makeReportForReceivedBlock(
- getDummyBlock(), newStorage);
+ StorageReceivedDeletedBlocks[] reports = DFSTestUtil.
+ makeReportForReceivedBlock(getDummyBlock(), BlockStatus.RECEIVED_BLOCK,
+ newStorage);
// Send the report to the NN.
cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
index 7226f51..8b51309 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
@@ -19,18 +19,29 @@ package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Assert;
@@ -38,6 +49,9 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
import static org.junit.Assert.assertEquals;
@@ -143,4 +157,131 @@ public class TestAddStripedBlocks {
}
return false;
}
+
+ @Test
+ public void testGetLocatedStripedBlocks() throws Exception {
+ final Path file = new Path("/file1");
+ // create an empty file
+ FSDataOutputStream out = null;
+ try {
+ out = dfs.create(file, (short) 1);
+
+ FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
+ INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
+ cluster.getNamesystem().getAdditionalBlock(file.toString(),
+ fileNode.getId(), dfs.getClient().getClientName(), null, null, null);
+ BlockInfoStripedUnderConstruction lastBlk =
+ (BlockInfoStripedUnderConstruction) fileNode.getLastBlock();
+ DatanodeInfo[] expectedDNs = DatanodeStorageInfo
+ .toDatanodeInfos(lastBlk.getExpectedStorageLocations());
+ int[] indices = lastBlk.getBlockIndices();
+
+ LocatedBlocks blks = dfs.getClient().getLocatedBlocks(file.toString(), 0L);
+ Assert.assertEquals(1, blks.locatedBlockCount());
+ LocatedBlock lblk = blks.get(0);
+
+ Assert.assertTrue(lblk instanceof LocatedStripedBlock);
+ DatanodeInfo[] datanodes = lblk.getLocations();
+ int[] blockIndices = ((LocatedStripedBlock) lblk).getBlockIndices();
+ Assert.assertEquals(GROUP_SIZE, datanodes.length);
+ Assert.assertEquals(GROUP_SIZE, blockIndices.length);
+ Assert.assertArrayEquals(indices, blockIndices);
+ Assert.assertArrayEquals(expectedDNs, datanodes);
+ } finally {
+ IOUtils.cleanup(null, out);
+ }
+ }
+
+ /**
+ * Test BlockInfoStripedUnderConstruction#addReplicaIfNotPresent in different
+ * scenarios.
+ */
+ @Test
+ public void testAddUCReplica() throws Exception {
+ final Path file = new Path("/file1");
+ final List<String> storageIDs = new ArrayList<>();
+ // create an empty file
+ FSDataOutputStream out = null;
+ try {
+ out = dfs.create(file, (short) 1);
+
+ // 1. create the UC striped block
+ FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
+ INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
+ cluster.getNamesystem().getAdditionalBlock(file.toString(),
+ fileNode.getId(), dfs.getClient().getClientName(), null, null, null);
+ BlockInfo lastBlock = fileNode.getLastBlock();
+ BlockInfoStripedUnderConstruction ucBlock =
+ (BlockInfoStripedUnderConstruction) lastBlock;
+
+ DatanodeStorageInfo[] locs = ucBlock.getExpectedStorageLocations();
+ int[] indices = ucBlock.getBlockIndices();
+ Assert.assertEquals(GROUP_SIZE, locs.length);
+ Assert.assertEquals(GROUP_SIZE, indices.length);
+
+ // 2. mimic incremental block reports and make sure the uc-replica list in
+ // the BlockStripedUC is correct
+ int i = 0;
+ for (DataNode dn : cluster.getDataNodes()) {
+ final Block block = new Block(lastBlock.getBlockId() + i++,
+ lastBlock.getGenerationStamp(), 0);
+ DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+ storageIDs.add(storage.getStorageID());
+ StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+ .makeReportForReceivedBlock(block, BlockStatus.RECEIVING_BLOCK,
+ storage);
+ for (StorageReceivedDeletedBlocks report : reports) {
+ cluster.getNamesystem().processIncrementalBlockReport(
+ dn.getDatanodeId(), report);
+ }
+ }
+
+ // make sure lastBlock is correct and the storages have been updated
+ locs = ucBlock.getExpectedStorageLocations();
+ indices = ucBlock.getBlockIndices();
+ Assert.assertEquals(GROUP_SIZE, locs.length);
+ Assert.assertEquals(GROUP_SIZE, indices.length);
+ for (DatanodeStorageInfo newstorage : locs) {
+ Assert.assertTrue(storageIDs.contains(newstorage.getStorageID()));
+ }
+ } finally {
+ IOUtils.cleanup(null, out);
+ }
+
+ // 3. restart the namenode. mimic the full block reports and check the
+ // uc-replica list again
+ cluster.restartNameNode(true);
+ final String bpId = cluster.getNamesystem().getBlockPoolId();
+ INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
+ .getINode4Write(file.toString()).asFile();
+ BlockInfo lastBlock = fileNode.getLastBlock();
+ int i = GROUP_SIZE - 1;
+ for (DataNode dn : cluster.getDataNodes()) {
+ String storageID = storageIDs.get(i);
+ final Block block = new Block(lastBlock.getBlockId() + i--,
+ lastBlock.getGenerationStamp(), 0);
+ DatanodeStorage storage = new DatanodeStorage(storageID);
+ List<ReplicaBeingWritten> blocks = new ArrayList<>();
+ ReplicaBeingWritten replica = new ReplicaBeingWritten(block, null, null,
+ null);
+ blocks.add(replica);
+ BlockListAsLongs bll = new BlockListAsLongs(null, blocks);
+ StorageBlockReport[] reports = {new StorageBlockReport(storage,
+ bll.getBlockListAsLongs())};
+ cluster.getNameNodeRpc().blockReport(dn.getDNRegistrationForBP(bpId),
+ bpId, reports);
+ }
+
+ BlockInfoStripedUnderConstruction ucBlock =
+ (BlockInfoStripedUnderConstruction) lastBlock;
+ DatanodeStorageInfo[] locs = ucBlock.getExpectedStorageLocations();
+ int[] indices = ucBlock.getBlockIndices();
+ Assert.assertEquals(GROUP_SIZE, locs.length);
+ Assert.assertEquals(GROUP_SIZE, indices.length);
+ for (i = 0; i < GROUP_SIZE; i++) {
+ Assert.assertEquals(storageIDs.get(i),
+ locs[GROUP_SIZE - 1 - i].getStorageID());
+ Assert.assertEquals(GROUP_SIZE - i - 1, indices[i]);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc774e7e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
index 913e0a7..4d42911 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
@@ -25,6 +25,8 @@ import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
@@ -219,8 +221,7 @@ public class TestFSImage {
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
- .waitSafeMode(false)
- .startupOption(StartupOption.UPGRADE)
+ .waitSafeMode(false).startupOption(StartupOption.UPGRADE)
.build();
try {
FileSystem fs = cluster.getFileSystem();