You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/06/07 17:49:48 UTC
hadoop git commit: HDFS-10468. HDFS read ends up ignoring an
interrupt. Contributed by Jing Zhao
Repository: hadoop
Updated Branches:
refs/heads/trunk c14c1b298 -> be34e85e6
HDFS-10468. HDFS read ends up ignoring an interrupt. Contributed by Jing Zhao
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/be34e85e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/be34e85e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/be34e85e
Branch: refs/heads/trunk
Commit: be34e85e682880f46eee0310bf00ecc7d39cd5bd
Parents: c14c1b2
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Jun 7 10:48:21 2016 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Jun 7 10:48:21 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSInputStream.java | 36 ++++++--
.../java/org/apache/hadoop/hdfs/TestRead.java | 87 ++++++++++++++++++++
.../server/datanode/SimulatedFSDataset.java | 4 +-
3 files changed, 119 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/be34e85e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 2ed0abd..7f32a56 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs;
import java.io.EOFException;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
@@ -304,7 +306,7 @@ public class DFSInputStream extends FSInputStream
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
- throw new IOException(
+ throw new InterruptedIOException(
"Interrupted while getting the last block length.");
}
}
@@ -379,6 +381,7 @@ public class DFSInputStream extends FSInputStream
return n;
}
} catch (IOException ioe) {
+ checkInterrupted(ioe);
if (ioe instanceof RemoteException) {
if (((RemoteException) ioe).unwrapRemoteException() instanceof
ReplicaNotFoundException) {
@@ -414,7 +417,8 @@ public class DFSInputStream extends FSInputStream
try {
Thread.sleep(500); // delay between retries.
} catch (InterruptedException e) {
- throw new IOException("Interrupted while getting the length.");
+ throw new InterruptedIOException(
+ "Interrupted while getting the length.");
}
}
@@ -660,6 +664,7 @@ public class DFSInputStream extends FSInputStream
}
return chosenNode;
} catch (IOException ex) {
+ checkInterrupted(ex);
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
@@ -681,6 +686,15 @@ public class DFSInputStream extends FSInputStream
}
}
+ private void checkInterrupted(IOException e) throws IOException {
+ if (Thread.currentThread().isInterrupted() &&
+ (e instanceof ClosedByInterruptException ||
+ e instanceof InterruptedIOException)) {
+ DFSClient.LOG.debug("The reading thread has been interrupted.", e);
+ throw e;
+ }
+ }
+
protected BlockReader getBlockReader(LocatedBlock targetBlock,
long offsetInBlock, long length, InetSocketAddress targetAddr,
StorageType storageType, DatanodeInfo datanode) throws IOException {
@@ -948,6 +962,7 @@ public class DFSInputStream extends FSInputStream
} catch (ChecksumException ce) {
throw ce;
} catch (IOException e) {
+ checkInterrupted(e);
if (retries == 1) {
DFSClient.LOG.warn("DFS Read", e);
}
@@ -1044,9 +1059,12 @@ public class DFSInputStream extends FSInputStream
// expanding time window for each failure
timeWindow * (failures + 1) *
ThreadLocalRandom.current().nextDouble();
- DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
+ DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) +
+ " IOException, will wait for " + waitTime + " msec.");
Thread.sleep((long)waitTime);
- } catch (InterruptedException ignored) {
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(
+ "Interrupted while choosing DataNode for read.");
}
deadNodes.clear(); //2nd option is to remove only nodes[blockId]
openInfo(true);
@@ -1140,7 +1158,8 @@ public class DFSInputStream extends FSInputStream
buf, offset, corruptedBlocks);
return;
} catch (IOException e) {
- // Ignore. Already processed inside the function.
+ checkInterrupted(e); // check if the read has been interrupted
+ // Ignore other IOException. Already processed inside the function.
// Loop through to try the next node.
}
}
@@ -1218,6 +1237,7 @@ public class DFSInputStream extends FSInputStream
addToDeadNodes(datanode.info);
throw new IOException(msg);
} catch (IOException e) {
+ checkInterrupted(e);
if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + datanode.addr
@@ -1306,8 +1326,11 @@ public class DFSInputStream extends FSInputStream
ignored.add(chosenNode.info);
dfsClient.getHedgedReadMetrics().incHedgedReadOps();
// continue; no need to refresh block locations
- } catch (InterruptedException | ExecutionException e) {
+ } catch (ExecutionException e) {
// Ignore
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(
+ "Interrupted while waiting for reading task");
}
} else {
// We are starting up a 'hedged' read. We have a read already
@@ -1594,6 +1617,7 @@ public class DFSInputStream extends FSInputStream
} catch (IOException e) {//make following read to retry
DFSClient.LOG.debug("Exception while seek to {} from {} of {} from "
+ "{}", targetPos, getCurrentBlock(), src, currentNode, e);
+ checkInterrupted(e);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/be34e85e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java
index 9d38fd7..974fdf8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java
@@ -19,9 +19,19 @@ package org.apache.hadoop.hdfs;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.io.IOUtils;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
@@ -103,4 +113,81 @@ public class TestRead {
cluster.shutdown();
}
}
+
+ @Test(timeout=60000)
+ public void testInterruptReader() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
+ DelayedSimulatedFSDataset.Factory.class.getName());
+
+ final MiniDFSCluster cluster = new MiniDFSCluster
+ .Builder(conf).numDataNodes(1).build();
+ final DistributedFileSystem fs = cluster.getFileSystem();
+ try {
+ cluster.waitActive();
+ final Path file = new Path("/foo");
+ DFSTestUtil.createFile(fs, file, 1024, (short) 1, 0L);
+
+ final FSDataInputStream in = fs.open(file);
+ AtomicBoolean readInterrupted = new AtomicBoolean(false);
+ final Thread reader = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ in.read(new byte[1024], 0, 1024);
+ } catch (IOException e) {
+ if (e instanceof ClosedByInterruptException ||
+ e instanceof InterruptedIOException) {
+ readInterrupted.set(true);
+ }
+ }
+ }
+ });
+
+ reader.start();
+ Thread.sleep(1000);
+ reader.interrupt();
+ reader.join();
+
+ Assert.assertTrue(readInterrupted.get());
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ private static class DelayedSimulatedFSDataset extends SimulatedFSDataset {
+ private volatile boolean isDelayed = true;
+
+ DelayedSimulatedFSDataset(DataNode datanode, DataStorage storage,
+ Configuration conf) {
+ super(datanode, storage, conf);
+ }
+
+ @Override
+ public synchronized InputStream getBlockInputStream(ExtendedBlock b,
+ long seekOffset) throws IOException {
+ while (isDelayed) {
+ try {
+ this.wait();
+ } catch (InterruptedException ignored) {
+ }
+ }
+ InputStream result = super.getBlockInputStream(b);
+ IOUtils.skipFully(result, seekOffset);
+ return result;
+ }
+
+ static class Factory extends FsDatasetSpi.Factory<DelayedSimulatedFSDataset> {
+ @Override
+ public DelayedSimulatedFSDataset newInstance(DataNode datanode,
+ DataStorage storage, Configuration conf) throws IOException {
+ return new DelayedSimulatedFSDataset(datanode, storage, conf);
+ }
+
+ @Override
+ public boolean isSimulated() {
+ return true;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/be34e85e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 1fdedca..25034c6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -960,8 +960,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
return new ReplicaHandler(binfo, null);
}
- synchronized InputStream getBlockInputStream(ExtendedBlock b
- ) throws IOException {
+ protected synchronized InputStream getBlockInputStream(ExtendedBlock b)
+ throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org