You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/06/07 17:49:48 UTC

hadoop git commit: HDFS-10468. HDFS read ends up ignoring an interrupt. Contributed by Jing Zhao

Repository: hadoop
Updated Branches:
  refs/heads/trunk c14c1b298 -> be34e85e6


HDFS-10468. HDFS read ends up ignoring an interrupt. Contributed by Jing Zhao


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/be34e85e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/be34e85e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/be34e85e

Branch: refs/heads/trunk
Commit: be34e85e682880f46eee0310bf00ecc7d39cd5bd
Parents: c14c1b2
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Jun 7 10:48:21 2016 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Jun 7 10:48:21 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 36 ++++++--
 .../java/org/apache/hadoop/hdfs/TestRead.java   | 87 ++++++++++++++++++++
 .../server/datanode/SimulatedFSDataset.java     |  4 +-
 3 files changed, 119 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/be34e85e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 2ed0abd..7f32a56 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -304,7 +306,7 @@ public class DFSInputStream extends FSInputStream
     try {
       Thread.sleep(waitTime);
     } catch (InterruptedException e) {
-      throw new IOException(
+      throw new InterruptedIOException(
           "Interrupted while getting the last block length.");
     }
   }
@@ -379,6 +381,7 @@ public class DFSInputStream extends FSInputStream
           return n;
         }
       } catch (IOException ioe) {
+        checkInterrupted(ioe);
         if (ioe instanceof RemoteException) {
           if (((RemoteException) ioe).unwrapRemoteException() instanceof
               ReplicaNotFoundException) {
@@ -414,7 +417,8 @@ public class DFSInputStream extends FSInputStream
         try {
           Thread.sleep(500); // delay between retries.
         } catch (InterruptedException e) {
-          throw new IOException("Interrupted while getting the length.");
+          throw new InterruptedIOException(
+              "Interrupted while getting the length.");
         }
       }
 
@@ -660,6 +664,7 @@ public class DFSInputStream extends FSInputStream
         }
         return chosenNode;
       } catch (IOException ex) {
+        checkInterrupted(ex);
         if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
           DFSClient.LOG.info("Will fetch a new encryption key and retry, "
               + "encryption key was invalid when connecting to " + targetAddr
@@ -681,6 +686,15 @@ public class DFSInputStream extends FSInputStream
     }
   }
 
+  private void checkInterrupted(IOException e) throws IOException {
+    if (Thread.currentThread().isInterrupted() &&
+        (e instanceof ClosedByInterruptException ||
+            e instanceof InterruptedIOException)) {
+      DFSClient.LOG.debug("The reading thread has been interrupted.", e);
+      throw e;
+    }
+  }
+
   protected BlockReader getBlockReader(LocatedBlock targetBlock,
       long offsetInBlock, long length, InetSocketAddress targetAddr,
       StorageType storageType, DatanodeInfo datanode) throws IOException {
@@ -948,6 +962,7 @@ public class DFSInputStream extends FSInputStream
         } catch (ChecksumException ce) {
           throw ce;
         } catch (IOException e) {
+          checkInterrupted(e);
           if (retries == 1) {
             DFSClient.LOG.warn("DFS Read", e);
           }
@@ -1044,9 +1059,12 @@ public class DFSInputStream extends FSInputStream
               // expanding time window for each failure
               timeWindow * (failures + 1) *
               ThreadLocalRandom.current().nextDouble();
-          DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
+          DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) +
+              " IOException, will wait for " + waitTime + " msec.");
           Thread.sleep((long)waitTime);
-        } catch (InterruptedException ignored) {
+        } catch (InterruptedException e) {
+          throw new InterruptedIOException(
+              "Interrupted while choosing DataNode for read.");
         }
         deadNodes.clear(); //2nd option is to remove only nodes[blockId]
         openInfo(true);
@@ -1140,7 +1158,8 @@ public class DFSInputStream extends FSInputStream
             buf, offset, corruptedBlocks);
         return;
       } catch (IOException e) {
-        // Ignore. Already processed inside the function.
+        checkInterrupted(e); // check if the read has been interrupted
+        // Ignore other IOException. Already processed inside the function.
         // Loop through to try the next node.
       }
     }
@@ -1218,6 +1237,7 @@ public class DFSInputStream extends FSInputStream
         addToDeadNodes(datanode.info);
         throw new IOException(msg);
       } catch (IOException e) {
+        checkInterrupted(e);
         if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
           DFSClient.LOG.info("Will fetch a new encryption key and retry, "
               + "encryption key was invalid when connecting to " + datanode.addr
@@ -1306,8 +1326,11 @@ public class DFSInputStream extends FSInputStream
           ignored.add(chosenNode.info);
           dfsClient.getHedgedReadMetrics().incHedgedReadOps();
           // continue; no need to refresh block locations
-        } catch (InterruptedException | ExecutionException e) {
+        } catch (ExecutionException e) {
           // Ignore
+        } catch (InterruptedException e) {
+          throw new InterruptedIOException(
+              "Interrupted while waiting for reading task");
         }
       } else {
         // We are starting up a 'hedged' read. We have a read already
@@ -1594,6 +1617,7 @@ public class DFSInputStream extends FSInputStream
         } catch (IOException e) {//make following read to retry
           DFSClient.LOG.debug("Exception while seek to {} from {} of {} from "
               + "{}", targetPos, getCurrentBlock(), src, currentNode, e);
+          checkInterrupted(e);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/be34e85e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java
index 9d38fd7..974fdf8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java
@@ -19,9 +19,19 @@ package org.apache.hadoop.hdfs;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.io.IOUtils;
 import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
@@ -103,4 +113,81 @@ public class TestRead {
       cluster.shutdown();
     }
   }
+
+  @Test(timeout=60000)
+  public void testInterruptReader() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
+        DelayedSimulatedFSDataset.Factory.class.getName());
+
+    final MiniDFSCluster cluster = new MiniDFSCluster
+        .Builder(conf).numDataNodes(1).build();
+    final DistributedFileSystem fs = cluster.getFileSystem();
+    try {
+      cluster.waitActive();
+      final Path file = new Path("/foo");
+      DFSTestUtil.createFile(fs, file, 1024, (short) 1, 0L);
+
+      final FSDataInputStream in = fs.open(file);
+      AtomicBoolean readInterrupted = new AtomicBoolean(false);
+      final Thread reader = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            in.read(new byte[1024], 0, 1024);
+          } catch (IOException e) {
+            if (e instanceof ClosedByInterruptException ||
+                e instanceof InterruptedIOException) {
+              readInterrupted.set(true);
+            }
+          }
+        }
+      });
+
+      reader.start();
+      Thread.sleep(1000);
+      reader.interrupt();
+      reader.join();
+
+      Assert.assertTrue(readInterrupted.get());
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  private static class DelayedSimulatedFSDataset extends SimulatedFSDataset {
+    private volatile boolean isDelayed = true;
+
+    DelayedSimulatedFSDataset(DataNode datanode, DataStorage storage,
+        Configuration conf) {
+      super(datanode, storage, conf);
+    }
+
+    @Override
+    public synchronized InputStream getBlockInputStream(ExtendedBlock b,
+        long seekOffset) throws IOException {
+      while (isDelayed) {
+        try {
+          this.wait();
+        } catch (InterruptedException ignored) {
+        }
+      }
+      InputStream result = super.getBlockInputStream(b);
+      IOUtils.skipFully(result, seekOffset);
+      return result;
+    }
+
+    static class Factory extends FsDatasetSpi.Factory<DelayedSimulatedFSDataset> {
+      @Override
+      public DelayedSimulatedFSDataset newInstance(DataNode datanode,
+          DataStorage storage, Configuration conf) throws IOException {
+        return new DelayedSimulatedFSDataset(datanode, storage, conf);
+      }
+
+      @Override
+      public boolean isSimulated() {
+        return true;
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/be34e85e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 1fdedca..25034c6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -960,8 +960,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     return new ReplicaHandler(binfo, null);
   }
 
-  synchronized InputStream getBlockInputStream(ExtendedBlock b
-      ) throws IOException {
+  protected synchronized InputStream getBlockInputStream(ExtendedBlock b)
+      throws IOException {
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
     BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org