You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-dev@hadoop.apache.org by lei liu <li...@gmail.com> on 2014/06/06 05:34:35 UTC

hedged read bug

I use hadoop2.4.

When I use "hedged read", If there is only one live datanode, the reading
from  the datanode throw TimeoutException and ChecksumException., the
Client will infinite wait.

Example below test case:
  @Test
  public void testException() throws IOException, InterruptedException,
ExecutionException {
    Configuration conf = new Configuration();
    int numHedgedReadPoolThreads = 5;
    final int hedgedReadTimeoutMillis = 50;
    conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
numHedgedReadPoolThreads);
    conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
      hedgedReadTimeoutMillis);
    // Set up the InjectionHandler
    DFSClientFaultInjector.instance =
Mockito.mock(DFSClientFaultInjector.class);
    DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
    // make preads ChecksumException
    Mockito.doAnswer(new Answer<Void>() {
      @Override
      public Void answer(InvocationOnMock invocation) throws Throwable {
        if(true) {
          Thread.sleep(hedgedReadTimeoutMillis + 10);
          throw new ChecksumException("test", 100);
        }
        return null;
      }
    }*).when(injector).fetchFromDatanodeException();*

    MiniDFSCluster cluster = new
MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build();
    DistributedFileSystem fileSys = cluster.getFileSystem();
    DFSClient dfsClient = fileSys.getClient();
    DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();

    try {
      Path file = new Path("/hedgedReadException.dat");
      FSDataOutputStream  output = fileSys.create(file,(short)1);
      byte[] data = new byte[64 * 1024];
      output.write(data);
      output.flush();
      output.write(data);
      output.flush();
      output.write(data);
      output.flush();
      output.close();
      byte[] buffer = new byte[64 * 1024];
      FSDataInputStream  input = fileSys.open(file);
      input.read(0, buffer, 0, 1024);
      input.close();
      assertTrue(metrics.getHedgedReadOps() == 1);
      assertTrue(metrics.getHedgedReadWins() == 1);
    } finally {
      fileSys.close();
      cluster.shutdown();
      Mockito.reset(injector);
    }
  }


*The code of actualGetFromOneDataNode() method call
**fetchFromDatanodeException()
method as below:*
      try {
        *DFSClientFaultInjector.get().fetchFromDatanodeException();*
        Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
        int len = (int) (end - start + 1);
        reader = new BlockReaderFactory(dfsClient.getConf()).
            setInetSocketAddress(targetAddr).
            setRemotePeerFactory(dfsClient).
            setDatanodeInfo(chosenNode).
            setFileName(src).
            setBlock(block.getBlock()).
            setBlockToken(blockToken).
            setStartOffset(start).
            setVerifyChecksum(verifyChecksum).
            setClientName(dfsClient.clientName).
            setLength(len).
            setCachingStrategy(curCachingStrategy).
            setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
            setClientCacheContext(dfsClient.getClientContext()).
            setUserGroupInformation(dfsClient.ugi).
            setConfiguration(dfsClient.getConfiguration()).
            build();
        int nread = reader.readAll(buf, offset, len);
        if (nread != len) {
          throw new IOException("truncated return from reader.read(): " +
                                "excpected " + len + ", got " + nread);
        }
        return;
      } catch (ChecksumException e) {
        String msg = "fetchBlockByteRange(). Got a checksum exception for "
            + src + " at " + block.getBlock() + ":" + e.getPos() + " from "
            + chosenNode;
        DFSClient.LOG.warn(msg);
        // we want to remember what we have tried
        addIntoCorruptedBlockMap(block.getBlock(), chosenNode,
corruptedBlockMap);
        addToDeadNodes(chosenNode);
        throw new IOException(msg);
      }

Re: hedged read bug

Posted by lei liu <li...@gmail.com>.
Hi Chirs,
I write one test case base on HDFS-6231 patch. You are right, no future is
canceled. But I find the while loop in hedgedFetchBlockByteRange method
execute 30,000 times. Please see hedged-read-test-case.patch in HDFS-6494
jira. I think that may be one problem.



2014-06-10 4:46 GMT+08:00 Ted Yu <yu...@gmail.com>:

> Lei:
> If you can attach the test code from your first email to HDFS-6494, that
> would help us know the scenario you were referring to.
>
> Cheers
>
>
> On Mon, Jun 9, 2014 at 12:06 PM, Chris Nauroth <cn...@hortonworks.com>
> wrote:
>
> > Hi Lei,
> >
> > I just reviewed this code path on trunk again, and I couldn't find a
> > problem.  It appears to me that if one future fails, then the exception
> > handling logic will allow the other future to proceed without canceling.
> >  Also, I haven't been able to reproduce the infinite loop that you
> reported
> > with the test case that you gave.
> >
> > However, if you're still seeing a bug on your side, then I recommend
> filing
> > a new jira issue with a full description.  We can continue
> troubleshooting
> > there.
> >
> > Chris Nauroth
> > Hortonworks
> > http://hortonworks.com/
> >
> >
> >
> > On Sun, Jun 8, 2014 at 8:16 PM, lei liu <li...@gmail.com> wrote:
> >
> > > Hi Chris,
> > >
> > > I review the patch, I think there is problem in the patch.
> > >
> > > Example  there are two futures, if the first return futrue is failure
> >  and
> > > then the the second future will be cancled.
> > >
> > >
> > > 2014-06-07 3:44 GMT+08:00 Chris Nauroth <cn...@hortonworks.com>:
> > >
> > > > Hello Lei,
> > > >
> > > > There is a known bug in 2.4.0 that can cause hedged reads to hang.  I
> > > fixed
> > > > it in HDFS-6231:
> > > >
> > > > https://issues.apache.org/jira/browse/HDFS-6231
> > > >
> > > > This patch will be included in the forthcoming 2.4.1 release.  I'm
> > > curious
> > > > to see if applying this patch fixes the problem for you.  Can you try
> > it
> > > > and let us know?  Thank you!
> > > >
> > > > Chris Nauroth
> > > > Hortonworks
> > > > http://hortonworks.com/
> > > >
> > > >
> > > >
> > > > On Thu, Jun 5, 2014 at 8:34 PM, lei liu <li...@gmail.com> wrote:
> > > >
> > > > > I use hadoop2.4.
> > > > >
> > > > > When I use "hedged read", If there is only one live datanode, the
> > > reading
> > > > > from  the datanode throw TimeoutException and ChecksumException.,
> the
> > > > > Client will infinite wait.
> > > > >
> > > > > Example below test case:
> > > > >   @Test
> > > > >   public void testException() throws IOException,
> > InterruptedException,
> > > > > ExecutionException {
> > > > >     Configuration conf = new Configuration();
> > > > >     int numHedgedReadPoolThreads = 5;
> > > > >     final int hedgedReadTimeoutMillis = 50;
> > > > >
> > > conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
> > > > > numHedgedReadPoolThreads);
> > > > >
> > > >
> conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
> > > > >       hedgedReadTimeoutMillis);
> > > > >     // Set up the InjectionHandler
> > > > >     DFSClientFaultInjector.instance =
> > > > > Mockito.mock(DFSClientFaultInjector.class);
> > > > >     DFSClientFaultInjector injector =
> > DFSClientFaultInjector.instance;
> > > > >     // make preads ChecksumException
> > > > >     Mockito.doAnswer(new Answer<Void>() {
> > > > >       @Override
> > > > >       public Void answer(InvocationOnMock invocation) throws
> > Throwable
> > > {
> > > > >         if(true) {
> > > > >           Thread.sleep(hedgedReadTimeoutMillis + 10);
> > > > >           throw new ChecksumException("test", 100);
> > > > >         }
> > > > >         return null;
> > > > >       }
> > > > >     }*).when(injector).fetchFromDatanodeException();*
> > > > >
> > > > >     MiniDFSCluster cluster = new
> > > > > MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build();
> > > > >     DistributedFileSystem fileSys = cluster.getFileSystem();
> > > > >     DFSClient dfsClient = fileSys.getClient();
> > > > >     DFSHedgedReadMetrics metrics =
> dfsClient.getHedgedReadMetrics();
> > > > >
> > > > >     try {
> > > > >       Path file = new Path("/hedgedReadException.dat");
> > > > >       FSDataOutputStream  output = fileSys.create(file,(short)1);
> > > > >       byte[] data = new byte[64 * 1024];
> > > > >       output.write(data);
> > > > >       output.flush();
> > > > >       output.write(data);
> > > > >       output.flush();
> > > > >       output.write(data);
> > > > >       output.flush();
> > > > >       output.close();
> > > > >       byte[] buffer = new byte[64 * 1024];
> > > > >       FSDataInputStream  input = fileSys.open(file);
> > > > >       input.read(0, buffer, 0, 1024);
> > > > >       input.close();
> > > > >       assertTrue(metrics.getHedgedReadOps() == 1);
> > > > >       assertTrue(metrics.getHedgedReadWins() == 1);
> > > > >     } finally {
> > > > >       fileSys.close();
> > > > >       cluster.shutdown();
> > > > >       Mockito.reset(injector);
> > > > >     }
> > > > >   }
> > > > >
> > > > >
> > > > > *The code of actualGetFromOneDataNode() method call
> > > > > **fetchFromDatanodeException()
> > > > > method as below:*
> > > > >       try {
> > > > >
> *DFSClientFaultInjector.get().fetchFromDatanodeException();*
> > > > >         Token<BlockTokenIdentifier> blockToken =
> > block.getBlockToken();
> > > > >         int len = (int) (end - start + 1);
> > > > >         reader = new BlockReaderFactory(dfsClient.getConf()).
> > > > >             setInetSocketAddress(targetAddr).
> > > > >             setRemotePeerFactory(dfsClient).
> > > > >             setDatanodeInfo(chosenNode).
> > > > >             setFileName(src).
> > > > >             setBlock(block.getBlock()).
> > > > >             setBlockToken(blockToken).
> > > > >             setStartOffset(start).
> > > > >             setVerifyChecksum(verifyChecksum).
> > > > >             setClientName(dfsClient.clientName).
> > > > >             setLength(len).
> > > > >             setCachingStrategy(curCachingStrategy).
> > > > >
> > > setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
> > > > >             setClientCacheContext(dfsClient.getClientContext()).
> > > > >             setUserGroupInformation(dfsClient.ugi).
> > > > >             setConfiguration(dfsClient.getConfiguration()).
> > > > >             build();
> > > > >         int nread = reader.readAll(buf, offset, len);
> > > > >         if (nread != len) {
> > > > >           throw new IOException("truncated return from
> reader.read():
> > > " +
> > > > >                                 "excpected " + len + ", got " +
> > nread);
> > > > >         }
> > > > >         return;
> > > > >       } catch (ChecksumException e) {
> > > > >         String msg = "fetchBlockByteRange(). Got a checksum
> exception
> > > > for "
> > > > >             + src + " at " + block.getBlock() + ":" + e.getPos() +
> "
> > > > from "
> > > > >             + chosenNode;
> > > > >         DFSClient.LOG.warn(msg);
> > > > >         // we want to remember what we have tried
> > > > >         addIntoCorruptedBlockMap(block.getBlock(), chosenNode,
> > > > > corruptedBlockMap);
> > > > >         addToDeadNodes(chosenNode);
> > > > >         throw new IOException(msg);
> > > > >       }
> > > > >
> > > >
> > > > --
> > > > CONFIDENTIALITY NOTICE
> > > > NOTICE: This message is intended for the use of the individual or
> > entity
> > > to
> > > > which it is addressed and may contain information that is
> confidential,
> > > > privileged and exempt from disclosure under applicable law. If the
> > reader
> > > > of this message is not the intended recipient, you are hereby
> notified
> > > that
> > > > any printing, copying, dissemination, distribution, disclosure or
> > > > forwarding of this communication is strictly prohibited. If you have
> > > > received this communication in error, please contact the sender
> > > immediately
> > > > and delete it from your system. Thank You.
> > > >
> > >
> >
> > --
> > CONFIDENTIALITY NOTICE
> > NOTICE: This message is intended for the use of the individual or entity
> to
> > which it is addressed and may contain information that is confidential,
> > privileged and exempt from disclosure under applicable law. If the reader
> > of this message is not the intended recipient, you are hereby notified
> that
> > any printing, copying, dissemination, distribution, disclosure or
> > forwarding of this communication is strictly prohibited. If you have
> > received this communication in error, please contact the sender
> immediately
> > and delete it from your system. Thank You.
> >
>

Re: hedged read bug

Posted by Ted Yu <yu...@gmail.com>.
Lei:
If you can attach the test code from your first email to HDFS-6494, that
would help us know the scenario you were referring to.

Cheers


On Mon, Jun 9, 2014 at 12:06 PM, Chris Nauroth <cn...@hortonworks.com>
wrote:

> Hi Lei,
>
> I just reviewed this code path on trunk again, and I couldn't find a
> problem.  It appears to me that if one future fails, then the exception
> handling logic will allow the other future to proceed without canceling.
>  Also, I haven't been able to reproduce the infinite loop that you reported
> with the test case that you gave.
>
> However, if you're still seeing a bug on your side, then I recommend filing
> a new jira issue with a full description.  We can continue troubleshooting
> there.
>
> Chris Nauroth
> Hortonworks
> http://hortonworks.com/
>
>
>
> On Sun, Jun 8, 2014 at 8:16 PM, lei liu <li...@gmail.com> wrote:
>
> > Hi Chris,
> >
> > I review the patch, I think there is problem in the patch.
> >
> > Example  there are two futures, if the first return futrue is failure
>  and
> > then the the second future will be cancled.
> >
> >
> > 2014-06-07 3:44 GMT+08:00 Chris Nauroth <cn...@hortonworks.com>:
> >
> > > Hello Lei,
> > >
> > > There is a known bug in 2.4.0 that can cause hedged reads to hang.  I
> > fixed
> > > it in HDFS-6231:
> > >
> > > https://issues.apache.org/jira/browse/HDFS-6231
> > >
> > > This patch will be included in the forthcoming 2.4.1 release.  I'm
> > curious
> > > to see if applying this patch fixes the problem for you.  Can you try
> it
> > > and let us know?  Thank you!
> > >
> > > Chris Nauroth
> > > Hortonworks
> > > http://hortonworks.com/
> > >
> > >
> > >
> > > On Thu, Jun 5, 2014 at 8:34 PM, lei liu <li...@gmail.com> wrote:
> > >
> > > > I use hadoop2.4.
> > > >
> > > > When I use "hedged read", If there is only one live datanode, the
> > reading
> > > > from  the datanode throw TimeoutException and ChecksumException., the
> > > > Client will infinite wait.
> > > >
> > > > Example below test case:
> > > >   @Test
> > > >   public void testException() throws IOException,
> InterruptedException,
> > > > ExecutionException {
> > > >     Configuration conf = new Configuration();
> > > >     int numHedgedReadPoolThreads = 5;
> > > >     final int hedgedReadTimeoutMillis = 50;
> > > >
> > conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
> > > > numHedgedReadPoolThreads);
> > > >
> > > conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
> > > >       hedgedReadTimeoutMillis);
> > > >     // Set up the InjectionHandler
> > > >     DFSClientFaultInjector.instance =
> > > > Mockito.mock(DFSClientFaultInjector.class);
> > > >     DFSClientFaultInjector injector =
> DFSClientFaultInjector.instance;
> > > >     // make preads ChecksumException
> > > >     Mockito.doAnswer(new Answer<Void>() {
> > > >       @Override
> > > >       public Void answer(InvocationOnMock invocation) throws
> Throwable
> > {
> > > >         if(true) {
> > > >           Thread.sleep(hedgedReadTimeoutMillis + 10);
> > > >           throw new ChecksumException("test", 100);
> > > >         }
> > > >         return null;
> > > >       }
> > > >     }*).when(injector).fetchFromDatanodeException();*
> > > >
> > > >     MiniDFSCluster cluster = new
> > > > MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build();
> > > >     DistributedFileSystem fileSys = cluster.getFileSystem();
> > > >     DFSClient dfsClient = fileSys.getClient();
> > > >     DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
> > > >
> > > >     try {
> > > >       Path file = new Path("/hedgedReadException.dat");
> > > >       FSDataOutputStream  output = fileSys.create(file,(short)1);
> > > >       byte[] data = new byte[64 * 1024];
> > > >       output.write(data);
> > > >       output.flush();
> > > >       output.write(data);
> > > >       output.flush();
> > > >       output.write(data);
> > > >       output.flush();
> > > >       output.close();
> > > >       byte[] buffer = new byte[64 * 1024];
> > > >       FSDataInputStream  input = fileSys.open(file);
> > > >       input.read(0, buffer, 0, 1024);
> > > >       input.close();
> > > >       assertTrue(metrics.getHedgedReadOps() == 1);
> > > >       assertTrue(metrics.getHedgedReadWins() == 1);
> > > >     } finally {
> > > >       fileSys.close();
> > > >       cluster.shutdown();
> > > >       Mockito.reset(injector);
> > > >     }
> > > >   }
> > > >
> > > >
> > > > *The code of actualGetFromOneDataNode() method call
> > > > **fetchFromDatanodeException()
> > > > method as below:*
> > > >       try {
> > > >         *DFSClientFaultInjector.get().fetchFromDatanodeException();*
> > > >         Token<BlockTokenIdentifier> blockToken =
> block.getBlockToken();
> > > >         int len = (int) (end - start + 1);
> > > >         reader = new BlockReaderFactory(dfsClient.getConf()).
> > > >             setInetSocketAddress(targetAddr).
> > > >             setRemotePeerFactory(dfsClient).
> > > >             setDatanodeInfo(chosenNode).
> > > >             setFileName(src).
> > > >             setBlock(block.getBlock()).
> > > >             setBlockToken(blockToken).
> > > >             setStartOffset(start).
> > > >             setVerifyChecksum(verifyChecksum).
> > > >             setClientName(dfsClient.clientName).
> > > >             setLength(len).
> > > >             setCachingStrategy(curCachingStrategy).
> > > >
> > setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
> > > >             setClientCacheContext(dfsClient.getClientContext()).
> > > >             setUserGroupInformation(dfsClient.ugi).
> > > >             setConfiguration(dfsClient.getConfiguration()).
> > > >             build();
> > > >         int nread = reader.readAll(buf, offset, len);
> > > >         if (nread != len) {
> > > >           throw new IOException("truncated return from reader.read():
> > " +
> > > >                                 "excpected " + len + ", got " +
> nread);
> > > >         }
> > > >         return;
> > > >       } catch (ChecksumException e) {
> > > >         String msg = "fetchBlockByteRange(). Got a checksum exception
> > > for "
> > > >             + src + " at " + block.getBlock() + ":" + e.getPos() + "
> > > from "
> > > >             + chosenNode;
> > > >         DFSClient.LOG.warn(msg);
> > > >         // we want to remember what we have tried
> > > >         addIntoCorruptedBlockMap(block.getBlock(), chosenNode,
> > > > corruptedBlockMap);
> > > >         addToDeadNodes(chosenNode);
> > > >         throw new IOException(msg);
> > > >       }
> > > >
> > >
> > > --
> > > CONFIDENTIALITY NOTICE
> > > NOTICE: This message is intended for the use of the individual or
> entity
> > to
> > > which it is addressed and may contain information that is confidential,
> > > privileged and exempt from disclosure under applicable law. If the
> reader
> > > of this message is not the intended recipient, you are hereby notified
> > that
> > > any printing, copying, dissemination, distribution, disclosure or
> > > forwarding of this communication is strictly prohibited. If you have
> > > received this communication in error, please contact the sender
> > immediately
> > > and delete it from your system. Thank You.
> > >
> >
>
> --
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity to
> which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.
>

Re: hedged read bug

Posted by Chris Nauroth <cn...@hortonworks.com>.
Hi Lei,

I just reviewed this code path on trunk again, and I couldn't find a
problem.  It appears to me that if one future fails, then the exception
handling logic will allow the other future to proceed without canceling.
 Also, I haven't been able to reproduce the infinite loop that you reported
with the test case that you gave.

However, if you're still seeing a bug on your side, then I recommend filing
a new jira issue with a full description.  We can continue troubleshooting
there.

Chris Nauroth
Hortonworks
http://hortonworks.com/



On Sun, Jun 8, 2014 at 8:16 PM, lei liu <li...@gmail.com> wrote:

> Hi Chris,
>
> I review the patch, I think there is problem in the patch.
>
> Example  there are two futures, if the first return futrue is failure  and
> then the the second future will be cancled.
>
>
> 2014-06-07 3:44 GMT+08:00 Chris Nauroth <cn...@hortonworks.com>:
>
> > Hello Lei,
> >
> > There is a known bug in 2.4.0 that can cause hedged reads to hang.  I
> fixed
> > it in HDFS-6231:
> >
> > https://issues.apache.org/jira/browse/HDFS-6231
> >
> > This patch will be included in the forthcoming 2.4.1 release.  I'm
> curious
> > to see if applying this patch fixes the problem for you.  Can you try it
> > and let us know?  Thank you!
> >
> > Chris Nauroth
> > Hortonworks
> > http://hortonworks.com/
> >
> >
> >
> > On Thu, Jun 5, 2014 at 8:34 PM, lei liu <li...@gmail.com> wrote:
> >
> > > I use hadoop2.4.
> > >
> > > When I use "hedged read", If there is only one live datanode, the
> reading
> > > from  the datanode throw TimeoutException and ChecksumException., the
> > > Client will infinite wait.
> > >
> > > Example below test case:
> > >   @Test
> > >   public void testException() throws IOException, InterruptedException,
> > > ExecutionException {
> > >     Configuration conf = new Configuration();
> > >     int numHedgedReadPoolThreads = 5;
> > >     final int hedgedReadTimeoutMillis = 50;
> > >
> conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
> > > numHedgedReadPoolThreads);
> > >
> > conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
> > >       hedgedReadTimeoutMillis);
> > >     // Set up the InjectionHandler
> > >     DFSClientFaultInjector.instance =
> > > Mockito.mock(DFSClientFaultInjector.class);
> > >     DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
> > >     // make preads ChecksumException
> > >     Mockito.doAnswer(new Answer<Void>() {
> > >       @Override
> > >       public Void answer(InvocationOnMock invocation) throws Throwable
> {
> > >         if(true) {
> > >           Thread.sleep(hedgedReadTimeoutMillis + 10);
> > >           throw new ChecksumException("test", 100);
> > >         }
> > >         return null;
> > >       }
> > >     }*).when(injector).fetchFromDatanodeException();*
> > >
> > >     MiniDFSCluster cluster = new
> > > MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build();
> > >     DistributedFileSystem fileSys = cluster.getFileSystem();
> > >     DFSClient dfsClient = fileSys.getClient();
> > >     DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
> > >
> > >     try {
> > >       Path file = new Path("/hedgedReadException.dat");
> > >       FSDataOutputStream  output = fileSys.create(file,(short)1);
> > >       byte[] data = new byte[64 * 1024];
> > >       output.write(data);
> > >       output.flush();
> > >       output.write(data);
> > >       output.flush();
> > >       output.write(data);
> > >       output.flush();
> > >       output.close();
> > >       byte[] buffer = new byte[64 * 1024];
> > >       FSDataInputStream  input = fileSys.open(file);
> > >       input.read(0, buffer, 0, 1024);
> > >       input.close();
> > >       assertTrue(metrics.getHedgedReadOps() == 1);
> > >       assertTrue(metrics.getHedgedReadWins() == 1);
> > >     } finally {
> > >       fileSys.close();
> > >       cluster.shutdown();
> > >       Mockito.reset(injector);
> > >     }
> > >   }
> > >
> > >
> > > *The code of actualGetFromOneDataNode() method call
> > > **fetchFromDatanodeException()
> > > method as below:*
> > >       try {
> > >         *DFSClientFaultInjector.get().fetchFromDatanodeException();*
> > >         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
> > >         int len = (int) (end - start + 1);
> > >         reader = new BlockReaderFactory(dfsClient.getConf()).
> > >             setInetSocketAddress(targetAddr).
> > >             setRemotePeerFactory(dfsClient).
> > >             setDatanodeInfo(chosenNode).
> > >             setFileName(src).
> > >             setBlock(block.getBlock()).
> > >             setBlockToken(blockToken).
> > >             setStartOffset(start).
> > >             setVerifyChecksum(verifyChecksum).
> > >             setClientName(dfsClient.clientName).
> > >             setLength(len).
> > >             setCachingStrategy(curCachingStrategy).
> > >
> setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
> > >             setClientCacheContext(dfsClient.getClientContext()).
> > >             setUserGroupInformation(dfsClient.ugi).
> > >             setConfiguration(dfsClient.getConfiguration()).
> > >             build();
> > >         int nread = reader.readAll(buf, offset, len);
> > >         if (nread != len) {
> > >           throw new IOException("truncated return from reader.read():
> " +
> > >                                 "excpected " + len + ", got " + nread);
> > >         }
> > >         return;
> > >       } catch (ChecksumException e) {
> > >         String msg = "fetchBlockByteRange(). Got a checksum exception
> > for "
> > >             + src + " at " + block.getBlock() + ":" + e.getPos() + "
> > from "
> > >             + chosenNode;
> > >         DFSClient.LOG.warn(msg);
> > >         // we want to remember what we have tried
> > >         addIntoCorruptedBlockMap(block.getBlock(), chosenNode,
> > > corruptedBlockMap);
> > >         addToDeadNodes(chosenNode);
> > >         throw new IOException(msg);
> > >       }
> > >
> >
> > --
> > CONFIDENTIALITY NOTICE
> > NOTICE: This message is intended for the use of the individual or entity
> to
> > which it is addressed and may contain information that is confidential,
> > privileged and exempt from disclosure under applicable law. If the reader
> > of this message is not the intended recipient, you are hereby notified
> that
> > any printing, copying, dissemination, distribution, disclosure or
> > forwarding of this communication is strictly prohibited. If you have
> > received this communication in error, please contact the sender
> immediately
> > and delete it from your system. Thank You.
> >
>

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

Re: hedged read bug

Posted by lei liu <li...@gmail.com>.
Hi Chris,

I review the patch, I think there is problem in the patch.

Example  there are two futures, if the first return futrue is failure  and
then the the second future will be cancled.


2014-06-07 3:44 GMT+08:00 Chris Nauroth <cn...@hortonworks.com>:

> Hello Lei,
>
> There is a known bug in 2.4.0 that can cause hedged reads to hang.  I fixed
> it in HDFS-6231:
>
> https://issues.apache.org/jira/browse/HDFS-6231
>
> This patch will be included in the forthcoming 2.4.1 release.  I'm curious
> to see if applying this patch fixes the problem for you.  Can you try it
> and let us know?  Thank you!
>
> Chris Nauroth
> Hortonworks
> http://hortonworks.com/
>
>
>
> On Thu, Jun 5, 2014 at 8:34 PM, lei liu <li...@gmail.com> wrote:
>
> > I use hadoop2.4.
> >
> > When I use "hedged read", If there is only one live datanode, the reading
> > from  the datanode throw TimeoutException and ChecksumException., the
> > Client will infinite wait.
> >
> > Example below test case:
> >   @Test
> >   public void testException() throws IOException, InterruptedException,
> > ExecutionException {
> >     Configuration conf = new Configuration();
> >     int numHedgedReadPoolThreads = 5;
> >     final int hedgedReadTimeoutMillis = 50;
> >     conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
> > numHedgedReadPoolThreads);
> >
> conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
> >       hedgedReadTimeoutMillis);
> >     // Set up the InjectionHandler
> >     DFSClientFaultInjector.instance =
> > Mockito.mock(DFSClientFaultInjector.class);
> >     DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
> >     // make preads ChecksumException
> >     Mockito.doAnswer(new Answer<Void>() {
> >       @Override
> >       public Void answer(InvocationOnMock invocation) throws Throwable {
> >         if(true) {
> >           Thread.sleep(hedgedReadTimeoutMillis + 10);
> >           throw new ChecksumException("test", 100);
> >         }
> >         return null;
> >       }
> >     }*).when(injector).fetchFromDatanodeException();*
> >
> >     MiniDFSCluster cluster = new
> > MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build();
> >     DistributedFileSystem fileSys = cluster.getFileSystem();
> >     DFSClient dfsClient = fileSys.getClient();
> >     DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
> >
> >     try {
> >       Path file = new Path("/hedgedReadException.dat");
> >       FSDataOutputStream  output = fileSys.create(file,(short)1);
> >       byte[] data = new byte[64 * 1024];
> >       output.write(data);
> >       output.flush();
> >       output.write(data);
> >       output.flush();
> >       output.write(data);
> >       output.flush();
> >       output.close();
> >       byte[] buffer = new byte[64 * 1024];
> >       FSDataInputStream  input = fileSys.open(file);
> >       input.read(0, buffer, 0, 1024);
> >       input.close();
> >       assertTrue(metrics.getHedgedReadOps() == 1);
> >       assertTrue(metrics.getHedgedReadWins() == 1);
> >     } finally {
> >       fileSys.close();
> >       cluster.shutdown();
> >       Mockito.reset(injector);
> >     }
> >   }
> >
> >
> > *The code of actualGetFromOneDataNode() method call
> > **fetchFromDatanodeException()
> > method as below:*
> >       try {
> >         *DFSClientFaultInjector.get().fetchFromDatanodeException();*
> >         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
> >         int len = (int) (end - start + 1);
> >         reader = new BlockReaderFactory(dfsClient.getConf()).
> >             setInetSocketAddress(targetAddr).
> >             setRemotePeerFactory(dfsClient).
> >             setDatanodeInfo(chosenNode).
> >             setFileName(src).
> >             setBlock(block.getBlock()).
> >             setBlockToken(blockToken).
> >             setStartOffset(start).
> >             setVerifyChecksum(verifyChecksum).
> >             setClientName(dfsClient.clientName).
> >             setLength(len).
> >             setCachingStrategy(curCachingStrategy).
> >             setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
> >             setClientCacheContext(dfsClient.getClientContext()).
> >             setUserGroupInformation(dfsClient.ugi).
> >             setConfiguration(dfsClient.getConfiguration()).
> >             build();
> >         int nread = reader.readAll(buf, offset, len);
> >         if (nread != len) {
> >           throw new IOException("truncated return from reader.read(): " +
> >                                 "excpected " + len + ", got " + nread);
> >         }
> >         return;
> >       } catch (ChecksumException e) {
> >         String msg = "fetchBlockByteRange(). Got a checksum exception
> for "
> >             + src + " at " + block.getBlock() + ":" + e.getPos() + "
> from "
> >             + chosenNode;
> >         DFSClient.LOG.warn(msg);
> >         // we want to remember what we have tried
> >         addIntoCorruptedBlockMap(block.getBlock(), chosenNode,
> > corruptedBlockMap);
> >         addToDeadNodes(chosenNode);
> >         throw new IOException(msg);
> >       }
> >
>
> --
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity to
> which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.
>

Re: hedged read bug

Posted by Chris Nauroth <cn...@hortonworks.com>.
Hello Lei,

There is a known bug in 2.4.0 that can cause hedged reads to hang.  I fixed
it in HDFS-6231:

https://issues.apache.org/jira/browse/HDFS-6231

This patch will be included in the forthcoming 2.4.1 release.  I'm curious
to see if applying this patch fixes the problem for you.  Can you try it
and let us know?  Thank you!

Chris Nauroth
Hortonworks
http://hortonworks.com/



On Thu, Jun 5, 2014 at 8:34 PM, lei liu <li...@gmail.com> wrote:

> I use hadoop2.4.
>
> When I use "hedged read", If there is only one live datanode, the reading
> from  the datanode throw TimeoutException and ChecksumException., the
> Client will infinite wait.
>
> Example below test case:
>   @Test
>   public void testException() throws IOException, InterruptedException,
> ExecutionException {
>     Configuration conf = new Configuration();
>     int numHedgedReadPoolThreads = 5;
>     final int hedgedReadTimeoutMillis = 50;
>     conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
> numHedgedReadPoolThreads);
>     conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
>       hedgedReadTimeoutMillis);
>     // Set up the InjectionHandler
>     DFSClientFaultInjector.instance =
> Mockito.mock(DFSClientFaultInjector.class);
>     DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
>     // make preads ChecksumException
>     Mockito.doAnswer(new Answer<Void>() {
>       @Override
>       public Void answer(InvocationOnMock invocation) throws Throwable {
>         if(true) {
>           Thread.sleep(hedgedReadTimeoutMillis + 10);
>           throw new ChecksumException("test", 100);
>         }
>         return null;
>       }
>     }*).when(injector).fetchFromDatanodeException();*
>
>     MiniDFSCluster cluster = new
> MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build();
>     DistributedFileSystem fileSys = cluster.getFileSystem();
>     DFSClient dfsClient = fileSys.getClient();
>     DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
>
>     try {
>       Path file = new Path("/hedgedReadException.dat");
>       FSDataOutputStream  output = fileSys.create(file,(short)1);
>       byte[] data = new byte[64 * 1024];
>       output.write(data);
>       output.flush();
>       output.write(data);
>       output.flush();
>       output.write(data);
>       output.flush();
>       output.close();
>       byte[] buffer = new byte[64 * 1024];
>       FSDataInputStream  input = fileSys.open(file);
>       input.read(0, buffer, 0, 1024);
>       input.close();
>       assertTrue(metrics.getHedgedReadOps() == 1);
>       assertTrue(metrics.getHedgedReadWins() == 1);
>     } finally {
>       fileSys.close();
>       cluster.shutdown();
>       Mockito.reset(injector);
>     }
>   }
>
>
> *The code of actualGetFromOneDataNode() method call
> **fetchFromDatanodeException()
> method as below:*
>       try {
>         *DFSClientFaultInjector.get().fetchFromDatanodeException();*
>         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
>         int len = (int) (end - start + 1);
>         reader = new BlockReaderFactory(dfsClient.getConf()).
>             setInetSocketAddress(targetAddr).
>             setRemotePeerFactory(dfsClient).
>             setDatanodeInfo(chosenNode).
>             setFileName(src).
>             setBlock(block.getBlock()).
>             setBlockToken(blockToken).
>             setStartOffset(start).
>             setVerifyChecksum(verifyChecksum).
>             setClientName(dfsClient.clientName).
>             setLength(len).
>             setCachingStrategy(curCachingStrategy).
>             setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
>             setClientCacheContext(dfsClient.getClientContext()).
>             setUserGroupInformation(dfsClient.ugi).
>             setConfiguration(dfsClient.getConfiguration()).
>             build();
>         int nread = reader.readAll(buf, offset, len);
>         if (nread != len) {
>           throw new IOException("truncated return from reader.read(): " +
>                                 "excpected " + len + ", got " + nread);
>         }
>         return;
>       } catch (ChecksumException e) {
>         String msg = "fetchBlockByteRange(). Got a checksum exception for "
>             + src + " at " + block.getBlock() + ":" + e.getPos() + " from "
>             + chosenNode;
>         DFSClient.LOG.warn(msg);
>         // we want to remember what we have tried
>         addIntoCorruptedBlockMap(block.getBlock(), chosenNode,
> corruptedBlockMap);
>         addToDeadNodes(chosenNode);
>         throw new IOException(msg);
>       }
>

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.