You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2017/06/07 05:31:01 UTC
hadoop git commit: HDFS-11708. Positional read will fail if replicas
moved to different DNs after stream is opened (Contributed by Vinayakumar B)
Repository: hadoop
Updated Branches:
refs/heads/branch-2.7 db81fbbab -> d2ec34d55
HDFS-11708. Positional read will fail if replicas moved to different DNs after stream is opened (Contributed by Vinayakumar B)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d2ec34d5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d2ec34d5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d2ec34d5
Branch: refs/heads/branch-2.7
Commit: d2ec34d5561a2f35ba6dffe240c4914aade7c72f
Parents: db81fbb
Author: Vinayakumar B <vi...@apache.org>
Authored: Wed Jun 7 10:35:13 2017 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Wed Jun 7 10:53:01 2017 +0530
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSInputStream.java | 34 +++--
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 46 ++++++
.../java/org/apache/hadoop/hdfs/TestPread.java | 152 +++++++++++++++++++
.../server/datanode/TestBlockReplacement.java | 41 +----
5 files changed, 227 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ec34d5/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 2b58537..2bac745 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -346,6 +346,9 @@ Release 2.7.4 - UNRELEASED
HDFS-11856. Ability to re-add upgrading nodes to pipeline for future
pipeline updates. (vinayakumarb via kihwal)
+ HDFS-11708. Positional read will fail if replicas moved to different DNs
+ after stream is opened. (vinayakumarb)
+
Release 2.7.3 - 2016-08-25
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ec34d5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index fcad2d7..bc10d31 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -633,6 +633,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
StorageType storageType = retval.storageType;
+ // Latest block if refreshed by chooseDatanode()
+ targetBlock = retval.block;
try {
ExtendedBlock blk = targetBlock.getBlock();
@@ -1045,7 +1047,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
}
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
- return new DNAddrPair(chosenNode, targetAddr, storageType);
+ return new DNAddrPair(chosenNode, targetAddr, storageType, block);
}
private static String getBestNodeDNAddrPairErrorString(
@@ -1077,11 +1079,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
- block = getBlockAt(block.getStartOffset());
while (true) {
DNAddrPair addressPair = chooseDataNode(block, null);
+ block = addressPair.block;
try {
- actualGetFromOneDataNode(addressPair, block, start, end, buf, offset,
+ actualGetFromOneDataNode(addressPair, start, end, buf, offset,
corruptedBlockMap);
return;
} catch (IOException e) {
@@ -1105,7 +1107,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
TraceScope scope =
Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
try {
- actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
+ actualGetFromOneDataNode(datanode, start, end, buf, offset,
corruptedBlockMap);
return bb;
} finally {
@@ -1116,20 +1118,16 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
private void actualGetFromOneDataNode(final DNAddrPair datanode,
- LocatedBlock block, final long start, final long end, byte[] buf,
- int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+ final long start, final long end, byte[] buf, int offset,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
-
+ LocatedBlock block = datanode.block;
while (true) {
- // cached block locations may have been updated by chooseDataNode()
- // or fetchBlockAt(). Always get the latest list of locations at the
- // start of the loop.
CachingStrategy curCachingStrategy;
boolean allowShortCircuitLocalReads;
- block = getBlockAt(block.getStartOffset());
synchronized(infoLock) {
curCachingStrategy = cachingStrategy;
allowShortCircuitLocalReads = !shortCircuitForbidden();
@@ -1187,7 +1185,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey();
- continue;
} else if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
refetchToken--;
try {
@@ -1195,7 +1192,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
} catch (IOException fbae) {
// ignore IOE, since we can retry it later in a loop
}
- continue;
} else {
String msg = "Failed to connect to " + targetAddr + " for file "
+ src + " for block " + block.getBlock() + ":" + e;
@@ -1203,6 +1199,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
addToDeadNodes(chosenNode);
throw new IOException(msg);
}
+ // Refresh the block for updated tokens in case of token failures or
+ // encryption key failures.
+ block = getBlockAt(block.getStartOffset());
} finally {
if (reader != null) {
reader.close();
@@ -1229,7 +1228,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
ByteBuffer bb = null;
int len = (int) (end - start + 1);
int hedgedReadId = 0;
- block = getBlockAt(block.getStartOffset());
while (true) {
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
hedgedReadOpsLoopNumForTesting++;
@@ -1239,6 +1237,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
// chooseDataNode is a commitment. If no node, we go to
// the NN to reget block locations. Only go here on first read.
chosenNode = chooseDataNode(block, ignored);
+ // Latest block, if refreshed internally
+ block = chosenNode.block;
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb, corruptedBlockMap,
@@ -1279,6 +1279,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
} catch (IOException ioe) {
chosenNode = chooseDataNode(block, ignored);
}
+ // Latest block, if refreshed internally
+ block = chosenNode.block;
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb, corruptedBlockMap,
@@ -1631,12 +1633,14 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
final DatanodeInfo info;
final InetSocketAddress addr;
final StorageType storageType;
+ final LocatedBlock block;
DNAddrPair(DatanodeInfo info, InetSocketAddress addr,
- StorageType storageType) {
+ StorageType storageType, LocatedBlock block) {
this.info = info;
this.addr = addr;
this.storageType = storageType;
+ this.block = block;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ec34d5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 974903b..04040fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -56,7 +56,9 @@ import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
@@ -69,6 +71,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
@@ -1836,4 +1839,47 @@ public class DFSTestUtil {
}
}, 1000, 60000);
}
+
+ /*
+ * Copy a block from sourceProxy to destination. If the block becomes
+ * over-replicated, preferably remove it from source.
+ * Return true if a block is successfully copied; otherwise false.
+ */
+ public static boolean replaceBlock(ExtendedBlock block, DatanodeInfo source,
+ DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
+ return replaceBlock(block, source, sourceProxy, destination,
+ StorageType.DEFAULT, Status.SUCCESS);
+ }
+
+ /*
+ * Replace block
+ */
+ public static boolean replaceBlock(ExtendedBlock block, DatanodeInfo source,
+ DatanodeInfo sourceProxy, DatanodeInfo destination,
+ StorageType targetStorageType, Status opStatus) throws IOException,
+ SocketException {
+ Socket sock = new Socket();
+ try {
+ sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()),
+ HdfsServerConstants.READ_TIMEOUT);
+ sock.setKeepAlive(true);
+ // sendRequest
+ DataOutputStream out = new DataOutputStream(sock.getOutputStream());
+ new Sender(out).replaceBlock(block, targetStorageType,
+ BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(),
+ sourceProxy);
+ out.flush();
+ // receiveResponse
+ DataInputStream reply = new DataInputStream(sock.getInputStream());
+
+ BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(
+ reply);
+ while (proto.getStatus() == Status.IN_PROGRESS) {
+ proto = BlockOpResponseProto.parseDelimitedFrom(reply);
+ }
+ return proto.getStatus() == opStatus;
+ } finally {
+ sock.close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ec34d5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
index bcb2104..f9e6483 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
@@ -24,6 +24,8 @@ import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -31,6 +33,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
@@ -38,9 +43,14 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Before;
@@ -49,6 +59,8 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import com.google.common.base.Supplier;
+
/**
* This class tests the DFS positional read functionality in a single node
* mini-cluster.
@@ -533,6 +545,146 @@ public class TestPread {
}
}
+ /**
+ * Scenario: 1. Write a file with RF=2, DN1 and DN2<br>
+ * 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.<br>
+ * 3. Move block from DN2 to DN3.<br>
+ * 4. Let block gets replicated to another DN3<br>
+ * 5. Stop DN1 also.<br>
+ * 6. Current valid Block locations in NameNode [DN1, DN3]<br>
+ * 7. Consider next calls to getBlockLocations() always returns DN3 as last
+ * location.<br>
+ */
+ @Test
+ public void testPreadFailureWithChangedBlockLocations() throws Exception {
+ doPreadTestWithChangedLocations();
+ }
+
+ /**
+ * Scenario: 1. Write a file with RF=2, DN1 and DN2<br>
+ * 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.<br>
+ * 3. Move block from DN2 to DN3.<br>
+ * 4. Let block gets replicated to another DN3<br>
+ * 5. Stop DN1 also.<br>
+ * 6. Current valid Block locations in NameNode [DN1, DN3]<br>
+ * 7. Consider next calls to getBlockLocations() always returns DN3 as last
+ * location.<br>
+ */
+ @Test
+ public void testPreadHedgedFailureWithChangedBlockLocations()
+ throws Exception {
+ isHedgedRead = true;
+ doPreadTestWithChangedLocations();
+ }
+
+ private void doPreadTestWithChangedLocations()
+ throws IOException, TimeoutException, InterruptedException {
+ GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
+ Configuration conf = new HdfsConfiguration();
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ if (isHedgedRead) {
+ conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 2);
+ }
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ try {
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ final Path p = new Path("/test");
+ String data = "testingmissingblock";
+ DFSTestUtil.writeFile(dfs, p, data);
+
+ FSDataInputStream in = dfs.open(p);
+ List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(in);
+ LocatedBlock lb = blocks.get(0);
+ DFSTestUtil.waitForReplication(cluster, lb.getBlock(), 1, 2, 0);
+ blocks = DFSTestUtil.getAllBlocks(in);
+ DatanodeInfo[] locations = null;
+ for (LocatedBlock locatedBlock : blocks) {
+ locations = locatedBlock.getLocations();
+ DFSClient.LOG
+ .info(locatedBlock.getBlock() + " " + Arrays.toString(locations));
+ }
+ final DatanodeInfo validDownLocation = locations[0];
+ final DFSClient client = dfs.getClient();
+ final DFSClient dfsClient = Mockito.spy(client);
+ // Keep the valid location as last in the locations list for second
+ // requests
+ // onwards.
+ final AtomicInteger count = new AtomicInteger(0);
+ Mockito.doAnswer(new Answer<LocatedBlocks>() {
+ @Override
+ public LocatedBlocks answer(InvocationOnMock invocation)
+ throws Throwable {
+ if (count.compareAndSet(0, 1)) {
+ return (LocatedBlocks) invocation.callRealMethod();
+ }
+ Object obj = invocation.callRealMethod();
+ LocatedBlocks locatedBlocks = (LocatedBlocks) obj;
+ LocatedBlock lb = locatedBlocks.get(0);
+ DatanodeInfo[] locations = lb.getLocations();
+ if (!(locations[0].getName().equals(validDownLocation.getName()))) {
+ // Latest location which is currently down, should be first
+ DatanodeInfo l = locations[0];
+ locations[0] = locations[locations.length - 1];
+ locations[locations.length - 1] = l;
+ }
+ return locatedBlocks;
+ }
+ }).when(dfsClient).getLocatedBlocks(p.toString(), 0);
+
+ // Findout target node to move the block to.
+ DatanodeInfo[] nodes =
+ cluster.getNameNodeRpc().getDatanodeReport(DatanodeReportType.LIVE);
+ DatanodeInfo toMove = null;
+ List<DatanodeInfo> locationsList = Arrays.asList(locations);
+ for (DatanodeInfo node : nodes) {
+ if (locationsList.contains(node)) {
+ continue;
+ }
+ toMove = node;
+ break;
+ }
+ // STEP 2: Open stream
+ DFSInputStream din = dfsClient.open(p.toString());
+ // STEP 3: Move replica
+ final DatanodeInfo source = locations[1];
+ final DatanodeInfo destination = toMove;
+ DFSTestUtil.replaceBlock(lb.getBlock(), source, locations[1], toMove);
+ // Wait for replica to get deleted
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+
+ @Override
+ public Boolean get() {
+ try {
+ LocatedBlocks lbs = dfsClient.getLocatedBlocks(p.toString(), 0);
+ LocatedBlock lb = lbs.get(0);
+ List<DatanodeInfo> locations = Arrays.asList(lb.getLocations());
+ DFSClient.LOG
+ .info("Source :" + source + ", destination: " + destination);
+ DFSClient.LOG.info("Got updated locations :" + locations);
+ return locations.contains(destination)
+ && !locations.contains(source);
+ } catch (IOException e) {
+ DFSClient.LOG.error("Problem in getting block locations", e);
+ }
+ return null;
+ }
+ }, 1000, 10000);
+ DFSTestUtil.waitForReplication(cluster, lb.getBlock(), 1, 2, 0);
+ // STEP 4: Stop first node in new locations
+ cluster.stopDataNode(validDownLocation.getName());
+ DFSClient.LOG.info("Starting read");
+ byte[] buf = new byte[1024];
+ int n = din.read(0, buf, 0, data.length());
+ assertEquals(data.length(), n);
+ assertEquals("Data should be read", data, new String(buf, 0, n));
+ DFSClient.LOG.info("Read completed");
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
public static void main(String[] args) throws Exception {
new TestPread().testPreadDFS();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ec34d5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
index a35089b..9c95a1f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
@@ -17,13 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -50,16 +49,11 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Time;
import org.junit.Test;
@@ -308,8 +302,8 @@ public class TestBlockReplacement {
*/
private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source,
DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
- return replaceBlock(block, source, sourceProxy, destination,
- StorageType.DEFAULT);
+ return DFSTestUtil.replaceBlock(block, source, sourceProxy, destination,
+ StorageType.DEFAULT, Status.SUCCESS);
}
/*
@@ -321,29 +315,8 @@ public class TestBlockReplacement {
DatanodeInfo sourceProxy,
DatanodeInfo destination,
StorageType targetStorageType) throws IOException, SocketException {
- Socket sock = new Socket();
- try {
- sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()),
- HdfsServerConstants.READ_TIMEOUT);
- sock.setKeepAlive(true);
- // sendRequest
- DataOutputStream out = new DataOutputStream(sock.getOutputStream());
- new Sender(out).replaceBlock(block, targetStorageType,
- BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(),
- sourceProxy);
- out.flush();
- // receiveResponse
- DataInputStream reply = new DataInputStream(sock.getInputStream());
-
- BlockOpResponseProto proto =
- BlockOpResponseProto.parseDelimitedFrom(reply);
- while (proto.getStatus() == Status.IN_PROGRESS) {
- proto = BlockOpResponseProto.parseDelimitedFrom(reply);
- }
- return proto.getStatus() == Status.SUCCESS;
- } finally {
- sock.close();
- }
+ return DFSTestUtil.replaceBlock(block, source, sourceProxy, destination,
+ targetStorageType, Status.SUCCESS);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org