You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2015/03/14 02:40:08 UTC
hadoop git commit: Revert "HDFS-7915. The DataNode can sometimes
allocate a ShortCircuitShm slot and fail to tell the DFSClient about it
because of a network error (cmccabe)" (Jenkins didn't run yet)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 7550052b8 -> f93a2dd94
Revert "HDFS-7915. The DataNode can sometimes allocate a ShortCircuitShm slot and fail to tell the DFSClient about it because of a network error (cmccabe)" (Jenkins didn't run yet)
This reverts commit 7550052b85bc9b73eb94cedc708f682681679b45.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f93a2dd9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f93a2dd9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f93a2dd9
Branch: refs/heads/branch-2
Commit: f93a2dd94b0c770aaff35bec4f45c7a76b6c2629
Parents: 7550052
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Fri Mar 13 18:39:43 2015 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Fri Mar 13 18:39:43 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 -
.../apache/hadoop/hdfs/BlockReaderFactory.java | 23 +----
.../java/org/apache/hadoop/hdfs/DFSClient.java | 2 -
.../datatransfer/DataTransferProtocol.java | 5 +-
.../hdfs/protocol/datatransfer/Receiver.java | 2 +-
.../hdfs/protocol/datatransfer/Sender.java | 4 +-
.../hdfs/server/datanode/DataXceiver.java | 95 ++++++++------------
.../server/datanode/ShortCircuitRegistry.java | 13 +--
.../src/main/proto/datatransfer.proto | 11 ---
.../shortcircuit/TestShortCircuitCache.java | 63 -------------
10 files changed, 43 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f93a2dd9/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index cb40232..40b538c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -874,9 +874,6 @@ Release 2.7.0 - UNRELEASED
HDFS-7722. DataNode#checkDiskError should also remove Storage when error
is found. (Lei Xu via Colin P. McCabe)
- HDFS-7915. The DataNode can sometimes allocate a ShortCircuitShm slot and
- fail to tell the DFSClient about it because of a network error (cmccabe)
-
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f93a2dd9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
index 1e915b2..ba48c79 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
-
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -71,12 +69,6 @@ import com.google.common.base.Preconditions;
public class BlockReaderFactory implements ShortCircuitReplicaCreator {
static final Log LOG = LogFactory.getLog(BlockReaderFactory.class);
- public static class FailureInjector {
- public void injectRequestFileDescriptorsFailure() throws IOException {
- // do nothing
- }
- }
-
@VisibleForTesting
static ShortCircuitReplicaCreator
createShortCircuitReplicaInfoCallback = null;
@@ -84,11 +76,6 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
private final DFSClient.Conf conf;
/**
- * Injects failures into specific operations during unit tests.
- */
- private final FailureInjector failureInjector;
-
- /**
* The file name, for logging and debugging purposes.
*/
private String fileName;
@@ -182,7 +169,6 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
public BlockReaderFactory(DFSClient.Conf conf) {
this.conf = conf;
- this.failureInjector = conf.brfFailureInjector;
this.remainingCacheTries = conf.nCachedConnRetry;
}
@@ -532,12 +518,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
SlotId slotId = slot == null ? null : slot.getSlotId();
- new Sender(out).requestShortCircuitFds(block, token, slotId, 1, true);
+ new Sender(out).requestShortCircuitFds(block, token, slotId, 1);
DataInputStream in = new DataInputStream(peer.getInputStream());
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(in));
DomainSocket sock = peer.getDomainSocket();
- failureInjector.injectRequestFileDescriptorsFailure();
switch (resp.getStatus()) {
case SUCCESS:
byte buf[] = new byte[1];
@@ -547,13 +532,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
try {
ExtendedBlockId key =
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
- if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
- LOG.trace("Sending receipt verification byte for slot " + slot);
- sock.getOutputStream().write(0);
- }
replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
Time.monotonicNow(), slot);
- return new ShortCircuitReplicaInfo(replica);
} catch (IOException e) {
// This indicates an error reading from disk, or a format error. Since
// it's not a socket communication problem, we return null rather than
@@ -565,6 +545,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
}
}
+ return new ShortCircuitReplicaInfo(replica);
case ERROR_UNSUPPORTED:
if (!resp.hasShortCircuitAccessVersion()) {
LOG.warn("short-circuit read access is disabled for " +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f93a2dd9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 3336077..ba6a1d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -339,8 +339,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
final long shortCircuitCacheStaleThresholdMs;
final long keyProviderCacheExpiryMs;
- public BlockReaderFactory.FailureInjector brfFailureInjector =
- new BlockReaderFactory.FailureInjector();
public Conf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f93a2dd9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index 48e931d..4be42a8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -138,13 +138,10 @@ public interface DataTransferProtocol {
* to use no slot id.
* @param maxVersion Maximum version of the block data the client
* can understand.
- * @param supportsReceiptVerification True if the client supports
- * receipt verification.
*/
public void requestShortCircuitFds(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
- SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
- throws IOException;
+ SlotId slotId, int maxVersion) throws IOException;
/**
* Release a pair of short-circuit FDs requested earlier.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f93a2dd9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index 31bdc5e..7994027 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -186,7 +186,7 @@ public abstract class Receiver implements DataTransferProtocol {
try {
requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken()),
- slotId, proto.getMaxVersion(), true);
+ slotId, proto.getMaxVersion());
} finally {
if (traceScope != null) traceScope.close();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f93a2dd9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index df69125..7fea33e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -181,8 +181,7 @@ public class Sender implements DataTransferProtocol {
@Override
public void requestShortCircuitFds(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
- SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
- throws IOException {
+ SlotId slotId, int maxVersion) throws IOException {
OpRequestShortCircuitAccessProto.Builder builder =
OpRequestShortCircuitAccessProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(
@@ -190,7 +189,6 @@ public class Sender implements DataTransferProtocol {
if (slotId != null) {
builder.setSlotId(PBHelper.convert(slotId));
}
- builder.setSupportsReceiptVerification(supportsReceiptVerification);
OpRequestShortCircuitAccessProto proto = builder.build();
send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f93a2dd9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 84504fb..e9547a8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -22,8 +22,6 @@ import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ER
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION;
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
import static org.apache.hadoop.util.Time.now;
@@ -293,83 +291,64 @@ class DataXceiver extends Receiver implements Runnable {
@Override
public void requestShortCircuitFds(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> token,
- SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
- throws IOException {
+ SlotId slotId, int maxVersion) throws IOException {
updateCurrentThreadName("Passing file descriptors for block " + blk);
BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
FileInputStream fis[] = null;
- SlotId registeredSlotId = null;
- boolean success = false;
try {
+ if (peer.getDomainSocket() == null) {
+ throw new IOException("You cannot pass file descriptors over " +
+ "anything but a UNIX domain socket.");
+ }
+ if (slotId != null) {
+ boolean isCached = datanode.data.
+ isCached(blk.getBlockPoolId(), blk.getBlockId());
+ datanode.shortCircuitRegistry.registerSlot(
+ ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
+ }
try {
- if (peer.getDomainSocket() == null) {
- throw new IOException("You cannot pass file descriptors over " +
- "anything but a UNIX domain socket.");
- }
- if (slotId != null) {
- boolean isCached = datanode.data.
- isCached(blk.getBlockPoolId(), blk.getBlockId());
- datanode.shortCircuitRegistry.registerSlot(
- ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
- registeredSlotId = slotId;
- }
fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
- Preconditions.checkState(fis != null);
- bld.setStatus(SUCCESS);
- bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
- } catch (ShortCircuitFdsVersionException e) {
- bld.setStatus(ERROR_UNSUPPORTED);
- bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
- bld.setMessage(e.getMessage());
- } catch (ShortCircuitFdsUnsupportedException e) {
- bld.setStatus(ERROR_UNSUPPORTED);
- bld.setMessage(e.getMessage());
- } catch (InvalidToken e) {
- bld.setStatus(ERROR_ACCESS_TOKEN);
- bld.setMessage(e.getMessage());
- } catch (IOException e) {
- bld.setStatus(ERROR);
- bld.setMessage(e.getMessage());
+ } finally {
+ if ((fis == null) && (slotId != null)) {
+ datanode.shortCircuitRegistry.unregisterSlot(slotId);
+ }
}
+ bld.setStatus(SUCCESS);
+ bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
+ } catch (ShortCircuitFdsVersionException e) {
+ bld.setStatus(ERROR_UNSUPPORTED);
+ bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
+ bld.setMessage(e.getMessage());
+ } catch (ShortCircuitFdsUnsupportedException e) {
+ bld.setStatus(ERROR_UNSUPPORTED);
+ bld.setMessage(e.getMessage());
+ } catch (InvalidToken e) {
+ bld.setStatus(ERROR_ACCESS_TOKEN);
+ bld.setMessage(e.getMessage());
+ } catch (IOException e) {
+ bld.setStatus(ERROR);
+ bld.setMessage(e.getMessage());
+ }
+ try {
bld.build().writeDelimitedTo(socketOut);
if (fis != null) {
FileDescriptor fds[] = new FileDescriptor[fis.length];
for (int i = 0; i < fds.length; i++) {
fds[i] = fis[i].getFD();
}
- byte buf[] = new byte[1];
- if (supportsReceiptVerification) {
- buf[0] = (byte)USE_RECEIPT_VERIFICATION.getNumber();
- } else {
- buf[0] = (byte)DO_NOT_USE_RECEIPT_VERIFICATION.getNumber();
- }
- DomainSocket sock = peer.getDomainSocket();
- sock.sendFileDescriptors(fds, buf, 0, buf.length);
- if (supportsReceiptVerification) {
- LOG.trace("Reading receipt verification byte for " + slotId);
- int val = sock.getInputStream().read();
- if (val < 0) {
- throw new EOFException();
- }
- } else {
- LOG.trace("Receipt verification is not enabled on the DataNode. " +
- "Not verifying " + slotId);
- }
- success = true;
+ byte buf[] = new byte[] { (byte)0 };
+ peer.getDomainSocket().
+ sendFileDescriptors(fds, buf, 0, buf.length);
}
} finally {
- if ((!success) && (registeredSlotId != null)) {
- LOG.info("Unregistering " + registeredSlotId + " because the " +
- "requestShortCircuitFdsForRead operation failed.");
- datanode.shortCircuitRegistry.unregisterSlot(registeredSlotId);
- }
if (ClientTraceLog.isInfoEnabled()) {
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk
.getBlockPoolId());
BlockSender.ClientTraceLog.info(String.format(
"src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," +
" blockid: %s, srvID: %s, success: %b",
- blk.getBlockId(), dnR.getDatanodeUuid(), success));
+ blk.getBlockId(), dnR.getDatanodeUuid(), (fis != null)
+ ));
}
if (fis != null) {
IOUtils.cleanup(LOG, fis);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f93a2dd9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
index b32c0d1..32906f4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
@@ -30,7 +30,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -84,7 +83,7 @@ public class ShortCircuitRegistry {
private static final int SHM_LENGTH = 8192;
- public static class RegisteredShm extends ShortCircuitShm
+ private static class RegisteredShm extends ShortCircuitShm
implements DomainSocketWatcher.Handler {
private final String clientName;
private final ShortCircuitRegistry registry;
@@ -384,14 +383,4 @@ public class ShortCircuitRegistry {
}
IOUtils.closeQuietly(watcher);
}
-
- public static interface Visitor {
- void accept(HashMap<ShmId, RegisteredShm> segments,
- HashMultimap<ExtendedBlockId, Slot> slots);
- }
-
- @VisibleForTesting
- public synchronized void visit(Visitor visitor) {
- visitor.accept(segments, slots);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f93a2dd9/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index 8426198..d72bb5e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -179,12 +179,6 @@ message OpRequestShortCircuitAccessProto {
* The shared memory slot to use, if we are using one.
*/
optional ShortCircuitShmSlotProto slotId = 3;
-
- /**
- * True if the client supports verifying that the file descriptor has been
- * sent successfully.
- */
- optional bool supportsReceiptVerification = 4 [default = false];
}
message ReleaseShortCircuitAccessRequestProto {
@@ -236,11 +230,6 @@ enum Status {
IN_PROGRESS = 12;
}
-enum ShortCircuitFdResponse {
- DO_NOT_USE_RECEIPT_VERIFICATION = 0;
- USE_RECEIPT_VERIFICATION = 1;
-}
-
message PipelineAckProto {
required sint64 seqno = 1;
repeated uint32 reply = 2;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f93a2dd9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
index 7daabd0..bfa871c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
@@ -36,16 +36,13 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import com.google.common.collect.HashMultimap;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
-import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -55,14 +52,11 @@ import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
-import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
-import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.CacheVisitor;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
@@ -621,61 +615,4 @@ public class TestShortCircuitCache {
cluster.shutdown();
sockDir.close();
}
-
- public static class TestCleanupFailureInjector
- extends BlockReaderFactory.FailureInjector {
- @Override
- public void injectRequestFileDescriptorsFailure() throws IOException {
- throw new IOException("injected I/O error");
- }
- }
-
- // Regression test for HDFS-7915
- @Test(timeout=60000)
- public void testDataXceiverCleansUpSlotsOnFailure() throws Exception {
- BlockReaderTestUtil.enableShortCircuitShmTracing();
- TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
- Configuration conf = createShortCircuitConf(
- "testDataXceiverCleansUpSlotsOnFailure", sockDir);
- conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
- 1000000000L);
- MiniDFSCluster cluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
- cluster.waitActive();
- DistributedFileSystem fs = cluster.getFileSystem();
- final Path TEST_PATH1 = new Path("/test_file1");
- final Path TEST_PATH2 = new Path("/test_file2");
- final int TEST_FILE_LEN = 4096;
- final int SEED = 0xFADE1;
- DFSTestUtil.createFile(fs, TEST_PATH1, TEST_FILE_LEN,
- (short)1, SEED);
- DFSTestUtil.createFile(fs, TEST_PATH2, TEST_FILE_LEN,
- (short)1, SEED);
-
- // The first read should allocate one shared memory segment and slot.
- DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
-
- // The second read should fail, and we should only have 1 segment and 1 slot
- // left.
- fs.getClient().getConf().brfFailureInjector =
- new TestCleanupFailureInjector();
- try {
- DFSTestUtil.readFileBuffer(fs, TEST_PATH2);
- } catch (Throwable t) {
- GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
- "testing, but we failed to do a non-TCP read.", t);
- }
- ShortCircuitRegistry registry =
- cluster.getDataNodes().get(0).getShortCircuitRegistry();
- registry.visit(new ShortCircuitRegistry.Visitor() {
- @Override
- public void accept(HashMap<ShmId, RegisteredShm> segments,
- HashMultimap<ExtendedBlockId, Slot> slots) {
- Assert.assertEquals(1, segments.size());
- Assert.assertEquals(1, slots.size());
- }
- });
- cluster.shutdown();
- sockDir.close();
- }
}