You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/17 01:59:00 UTC

[32/50] hadoop git commit: HDFS-8203. Erasure Coding: Seek and other Ops in DFSStripedInputStream. Contributed by Yi Liu.

HDFS-8203. Erasure Coding: Seek and other Ops in DFSStripedInputStream. Contributed by Yi Liu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6f6ac390
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6f6ac390
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6f6ac390

Branch: refs/heads/HDFS-7285
Commit: 6f6ac390abe9549996d97f583883be91fc5b04b6
Parents: dd2144c
Author: Jing Zhao <ji...@apache.org>
Authored: Thu May 7 11:06:40 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Sat May 16 15:16:06 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |  3 +
 .../hadoop/hdfs/DFSStripedInputStream.java      | 88 +++++++++++++++++---
 .../hadoop/hdfs/TestWriteReadStripedFile.java   | 83 +++++++++++++++---
 3 files changed, 151 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f6ac390/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 11e8376..fed08e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -186,3 +186,6 @@
 
     HDFS-8129. Erasure Coding: Maintain consistent naming for Erasure Coding related classes - EC/ErasureCoding
     (umamahesh)
+
+    HDFS-8203. Erasure Coding: Seek and other Ops in DFSStripedInputStream.
+    (Yi Liu via jing9)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f6ac390/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 7cb7b6d..9011192 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -19,10 +19,13 @@ package org.apache.hadoop.hdfs;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.ByteBufferPool;
+
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions;
 
@@ -31,9 +34,11 @@ import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.EnumSet;
 import java.util.Set;
 import java.util.Map;
 import java.util.HashMap;
@@ -263,6 +268,10 @@ public class DFSStripedInputStream extends DFSInputStream {
   }
 
   private long getOffsetInBlockGroup() {
+    return getOffsetInBlockGroup(pos);
+  }
+
+  private long getOffsetInBlockGroup(long pos) {
     return pos - currentLocatedBlock.getStartOffset();
   }
 
@@ -278,18 +287,22 @@ public class DFSStripedInputStream extends DFSInputStream {
     // compute stripe range based on pos
     final long offsetInBlockGroup = getOffsetInBlockGroup();
     final long stripeLen = cellSize * dataBlkNum;
-    int stripeIndex = (int) (offsetInBlockGroup / stripeLen);
-    curStripeRange = new StripeRange(stripeIndex * stripeLen,
-        Math.min(currentLocatedBlock.getBlockSize() - (stripeIndex * stripeLen),
-            stripeLen));
-    final int numCell = (int) ((curStripeRange.length - 1) / cellSize + 1);
+    final int stripeIndex = (int) (offsetInBlockGroup / stripeLen);
+    final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen);
+    final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize()
+        - (stripeIndex * stripeLen), stripeLen);
+    curStripeRange = new StripeRange(offsetInBlockGroup,
+        stripeLimit - stripeBufOffset);
+
+    final int startCell = stripeBufOffset / cellSize;
+    final int numCell = (stripeLimit - 1) / cellSize + 1;
 
     // read the whole stripe in parallel
     Map<Future<Integer>, Integer> futures = new HashMap<>();
-    for (int i = 0; i < numCell; i++) {
-      curStripeBuf.position(cellSize * i);
-      curStripeBuf.limit((int) Math.min(cellSize * (i + 1),
-          curStripeRange.length));
+    for (int i = startCell; i < numCell; i++) {
+      int bufPos = i == startCell ? stripeBufOffset : cellSize * i;
+      curStripeBuf.position(bufPos);
+      curStripeBuf.limit(Math.min(cellSize * (i + 1), stripeLimit));
       ByteBuffer buf = curStripeBuf.slice();
       ByteBufferStrategy strategy = new ByteBufferStrategy(buf);
       final int targetLength = buf.remaining();
@@ -329,6 +342,39 @@ public class DFSStripedInputStream extends DFSInputStream {
     };
   }
 
+  /**
+   * Seek to a new arbitrary location
+   */
+  @Override
+  public synchronized void seek(long targetPos) throws IOException {
+    if (targetPos > getFileLength()) {
+      throw new EOFException("Cannot seek after EOF");
+    }
+    if (targetPos < 0) {
+      throw new EOFException("Cannot seek to negative offset");
+    }
+    if (closed.get()) {
+      throw new IOException("Stream is closed!");
+    }
+    if (targetPos <= blockEnd) {
+      final long targetOffsetInBlk = getOffsetInBlockGroup(targetPos);
+      if (curStripeRange.include(targetOffsetInBlk)) {
+        int bufOffset = getStripedBufOffset(targetOffsetInBlk);
+        curStripeBuf.position(bufOffset);
+        pos = targetPos;
+        return;
+      }
+    }
+    pos = targetPos;
+    blockEnd = -1;
+  }
+
+  private int getStripedBufOffset(long offsetInBlockGroup) {
+    final long stripeLen = cellSize * dataBlkNum;
+    // compute the position in the curStripeBuf based on "pos"
+    return (int) (offsetInBlockGroup % stripeLen);
+  }
+
   @Override
   protected synchronized int readWithStrategy(ReaderStrategy strategy,
       int off, int len) throws IOException {
@@ -405,10 +451,8 @@ public class DFSStripedInputStream extends DFSInputStream {
    * @return number of bytes copied
    */
   private int copy(ReaderStrategy strategy, int offset, int length) {
-    final long stripeLen = cellSize * dataBlkNum;
-    final long offsetInBlk = pos - currentLocatedBlock.getStartOffset();
-    // compute the position in the curStripeBuf based on "pos"
-    int bufOffset = (int) (offsetInBlk % stripeLen);
+    final long offsetInBlk = getOffsetInBlockGroup();
+    int bufOffset = getStripedBufOffset(offsetInBlk);
     curStripeBuf.position(bufOffset);
     return strategy.copyFrom(curStripeBuf, offset,
         Math.min(length, curStripeBuf.remaining()));
@@ -546,4 +590,22 @@ public class DFSStripedInputStream extends DFSInputStream {
     }
     throw new InterruptedException("let's retry");
   }
+
+  /**
+   * May need online read recovery, zero-copy read doesn't make
+   * sense, so don't support it.
+   */
+  @Override
+  public synchronized ByteBuffer read(ByteBufferPool bufferPool,
+      int maxLength, EnumSet<ReadOption> opts)
+          throws IOException, UnsupportedOperationException {
+    throw new UnsupportedOperationException(
+        "Not support enhanced byte buffer access.");
+  }
+
+  @Override
+  public synchronized void releaseBuffer(ByteBuffer buffer) {
+    throw new UnsupportedOperationException(
+        "Not support enhanced byte buffer access.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f6ac390/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
index eacc6ed..5c6f449 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
@@ -22,12 +22,12 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -150,11 +150,35 @@ public class TestWriteReadStripedFile {
     return bytes;
   }
 
+  private int readAll(FSDataInputStream in, byte[] buf) throws IOException {
+    int readLen = 0;
+    int ret;
+    do {
+      ret = in.read(buf, readLen, buf.length - readLen);
+      if (ret > 0) {
+        readLen += ret;
+      }
+    } while (ret >= 0 && readLen < buf.length);
+    return readLen;
+  }
+
   private byte getByte(long pos) {
     final int mod = 29;
     return (byte) (pos % mod + 1);
   }
 
+  private void assertSeekAndRead(FSDataInputStream fsdis, int pos,
+      int writeBytes) throws IOException {
+    fsdis.seek(pos);
+    byte[] buf = new byte[writeBytes];
+    int readLen = readAll(fsdis, buf);
+    Assert.assertEquals(readLen, writeBytes - pos);
+    for (int i = 0; i < readLen; i++) {
+      Assert.assertEquals("Byte at " + i + " should be the same",
+          getByte(pos + i), buf[i]);
+    }
+  }
+
   private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
       throws IOException {
     Path testPath = new Path(src);
@@ -183,15 +207,7 @@ public class TestWriteReadStripedFile {
     // stateful read with byte array
     try (FSDataInputStream fsdis = fs.open(new Path(src))) {
       byte[] buf = new byte[writeBytes + 100];
-      int readLen = 0;
-      int ret;
-      do {
-        ret = fsdis.read(buf, readLen, buf.length - readLen);
-        if (ret > 0) {
-          readLen += ret;
-        }
-      } while (ret >= 0);
-      readLen = readLen >= 0 ? readLen : 0;
+      int readLen = readAll(fsdis, buf);
       Assert.assertEquals("The length of file should be the same to write size",
           writeBytes, readLen);
       for (int i = 0; i < writeBytes; i++) {
@@ -200,6 +216,53 @@ public class TestWriteReadStripedFile {
       }
     }
 
+    // seek and stateful read
+    try (FSDataInputStream fsdis = fs.open(new Path(src))) {
+      // seek to 1/2 of content
+      int pos = writeBytes/2;
+      assertSeekAndRead(fsdis, pos, writeBytes);
+
+      // seek to 1/3 of content
+      pos = writeBytes/3;
+      assertSeekAndRead(fsdis, pos, writeBytes);
+
+      // seek to 0 pos
+      pos = 0;
+      assertSeekAndRead(fsdis, pos, writeBytes);
+
+      if (writeBytes > cellSize) {
+        // seek to cellSize boundary
+        pos = cellSize -1;
+        assertSeekAndRead(fsdis, pos, writeBytes);
+      }
+
+      if (writeBytes > cellSize * dataBlocks) {
+        // seek to striped cell group boundary
+        pos = cellSize * dataBlocks - 1;
+        assertSeekAndRead(fsdis, pos, writeBytes);
+      }
+
+      if (writeBytes > blockSize * dataBlocks) {
+        // seek to striped block group boundary
+        pos = blockSize * dataBlocks - 1;
+        assertSeekAndRead(fsdis, pos, writeBytes);
+      }
+
+      try {
+        fsdis.seek(-1);
+        Assert.fail("Should be failed if seek to negative offset");
+      } catch (EOFException e) {
+        // expected
+      }
+
+      try {
+        fsdis.seek(writeBytes + 1);
+        Assert.fail("Should be failed if seek after EOF");
+      } catch (EOFException e) {
+        // expected
+      }
+    }
+
     // stateful read with ByteBuffer
     try (FSDataInputStream fsdis = fs.open(new Path(src))) {
       ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100);