You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/17 01:58:39 UTC

[11/50] hadoop git commit: HDFS-8136. Client gets and uses EC schema when reads and writes a stripping file. Contributed by Kai Sasaki

HDFS-8136. Client gets and uses EC schema when reads and writes a stripping file. Contributed by Kai Sasaki


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/11a17e66
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/11a17e66
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/11a17e66

Branch: refs/heads/HDFS-7285
Commit: 11a17e66e5ffa7592320248870bf3bf82a1d063f
Parents: d24efdf
Author: Kai Zheng <ka...@intel.com>
Authored: Fri Apr 24 00:19:12 2015 +0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Sat May 16 15:15:22 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../hadoop/hdfs/DFSStripedInputStream.java      |  17 +-
 .../hadoop/hdfs/DFSStripedOutputStream.java     |  24 ++-
 .../hadoop/hdfs/TestDFSStripedInputStream.java  | 175 +++++++++++++++++++
 .../hadoop/hdfs/TestDFSStripedOutputStream.java |   4 +-
 .../apache/hadoop/hdfs/TestReadStripedFile.java |   1 -
 6 files changed, 209 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/11a17e66/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index b2faac0..8977c46 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -119,3 +119,6 @@
 
     HDFS-8156. Add/implement necessary APIs even we just have the system default 
     schema. (Kai Zheng via Zhe Zhang)
+
+    HDFS-8136. Client gets and uses EC schema when reads and writes a stripping
+    file. (Kai Sasaki via Kai Zheng)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11a17e66/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index d597407..d0e2b68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -21,9 +21,9 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
 import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.net.NetUtils;
@@ -125,13 +125,19 @@ public class DFSStripedInputStream extends DFSInputStream {
     return results;
   }
 
-  private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
-  private final short dataBlkNum = HdfsConstants.NUM_DATA_BLOCKS;
-  private final short parityBlkNum = HdfsConstants.NUM_PARITY_BLOCKS;
+  private final int cellSize;
+  private final short dataBlkNum;
+  private final short parityBlkNum;
+  private final ECInfo ecInfo;
 
   DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum)
       throws IOException {
     super(dfsClient, src, verifyChecksum);
+    // ECInfo is restored from NN just before reading striped file.
+    ecInfo = dfsClient.getErasureCodingInfo(src);
+    cellSize = ecInfo.getSchema().getChunkSize();
+    dataBlkNum = (short)ecInfo.getSchema().getNumDataUnits();
+    parityBlkNum = (short)ecInfo.getSchema().getNumParityUnits();
     DFSClient.LOG.debug("Creating an striped input stream for file " + src);
   }
 
@@ -279,9 +285,6 @@ public class DFSStripedInputStream extends DFSInputStream {
     throw new InterruptedException("let's retry");
   }
 
-  public void setCellSize(int cellSize) {
-    this.cellSize = cellSize;
-  }
 
   /**
    * This class represents the portion of I/O associated with each block in the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11a17e66/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index 7dc0091..eeb9d7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -32,8 +32,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
@@ -61,11 +61,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
   /**
    * Size of each striping cell, must be a multiple of bytesPerChecksum
    */
-  private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final ECInfo ecInfo;
+  private final int cellSize;
   private ByteBuffer[] cellBuffers;
-  private final short numAllBlocks = HdfsConstants.NUM_DATA_BLOCKS
-      + HdfsConstants.NUM_PARITY_BLOCKS;
-  private final short numDataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+
+  private final short numAllBlocks;
+  private final short numDataBlocks;
+
   private int curIdx = 0;
   /* bytes written in current block group */
   //private long currentBlockGroupBytes = 0;
@@ -77,6 +79,10 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     return streamers.get(0);
   }
 
+  private long getBlockGroupSize() {
+    return blockSize * numDataBlocks;
+  }
+
   /** Construct a new output stream for creating a file. */
   DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
                          EnumSet<CreateFlag> flag, Progressable progress,
@@ -84,6 +90,14 @@ public class DFSStripedOutputStream extends DFSOutputStream {
                          throws IOException {
     super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
     DFSClient.LOG.info("Creating striped output stream");
+
+    // ECInfo is restored from NN just before writing striped files.
+    ecInfo = dfsClient.getErasureCodingInfo(src);
+    cellSize = ecInfo.getSchema().getChunkSize();
+    numAllBlocks = (short)(ecInfo.getSchema().getNumDataUnits()
+        + ecInfo.getSchema().getNumParityUnits());
+    numDataBlocks = (short)ecInfo.getSchema().getNumDataUnits();
+
     checkConfiguration();
 
     cellBuffers = new ByteBuffer[numAllBlocks];

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11a17e66/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
new file mode 100644
index 0000000..6af4a7f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestDFSStripedInputStream {
+  private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+  private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+
+
+  private static DistributedFileSystem fs;
+  private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final static int stripesPerBlock = 4;
+  static int blockSize = cellSize * stripesPerBlock;
+  private int mod = 29;
+  static int numDNs = dataBlocks + parityBlocks + 2;
+
+  private static MiniDFSCluster cluster;
+  private static Configuration conf;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    cluster
+        = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();;
+    cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
+    fs = cluster.getFileSystem();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testFileEmpty() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/EmptyFile", 0);
+  }
+
+  @Test
+  public void testFileSmallerThanOneCell1() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", 1);
+  }
+
+  @Test
+  public void testFileSmallerThanOneCell2() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", cellSize - 1);
+  }
+
+  @Test
+  public void testFileEqualsWithOneCell() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell", cellSize);
+  }
+
+  @Test
+  public void testFileSmallerThanOneStripe1() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", cellSize * dataBlocks - 1);
+  }
+
+  @Test
+  public void testFileSmallerThanOneStripe2() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", cellSize + 123);
+  }
+
+  @Test
+  public void testFileEqualsWithOneStripe() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe", cellSize * dataBlocks);
+  }
+
+  @Test
+  public void testFileMoreThanOneStripe1() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1", cellSize * dataBlocks + 123);
+  }
+
+  @Test
+  public void testFileMoreThanOneStripe2() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2", cellSize * dataBlocks
+        + cellSize * dataBlocks + 123);
+  }
+
+  @Test
+  public void testFileFullBlockGroup() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/FullBlockGroup", blockSize * dataBlocks);
+  }
+
+  @Test
+  public void testFileMoreThanABlockGroup1() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1", blockSize * dataBlocks + 123);
+  }
+
+  @Test
+  public void testFileMoreThanABlockGroup2() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2", blockSize * dataBlocks + cellSize+ 123);
+  }
+
+
+  @Test
+  public void testFileMoreThanABlockGroup3() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup3",
+        blockSize * dataBlocks * 3 + cellSize * dataBlocks
+            + cellSize + 123);
+  }
+
+  private byte[] generateBytes(int cnt) {
+    byte[] bytes = new byte[cnt];
+    for (int i = 0; i < cnt; i++) {
+      bytes[i] = getByte(i);
+    }
+    return bytes;
+  }
+
+  private byte getByte(long pos) {
+    return (byte) (pos % mod + 1);
+  }
+
+  private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
+      throws IOException {
+    Path TestPath = new Path(src);
+    byte[] bytes = generateBytes(writeBytes);
+    DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
+
+    //check file length
+    FileStatus status = fs.getFileStatus(TestPath);
+    long fileLength = status.getLen();
+    Assert.assertEquals("File length should be the same",
+        writeBytes, fileLength);
+
+    DFSStripedInputStream dis = new DFSStripedInputStream(
+        fs.getClient(), src, true);
+    try {
+      byte[] buf = new byte[writeBytes + 100];
+      int readLen = dis.read(0, buf, 0, buf.length);
+      readLen = readLen >= 0 ? readLen : 0;
+      Assert.assertEquals("The length of file should be the same to write size",
+          writeBytes, readLen);
+      for (int i = 0; i < writeBytes; i++) {
+        Assert.assertEquals("Byte at i should be the same",
+            getByte(i), buf[i]);
+      }
+    } finally {
+      dis.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11a17e66/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
index c213183..26f6d2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -159,7 +159,7 @@ public class TestDFSStripedOutputStream {
     // check file length
     FileStatus status = fs.getFileStatus(testPath);
     Assert.assertEquals(writeBytes, status.getLen());
-    
+
     checkData(src, writeBytes);
   }
 
@@ -236,7 +236,7 @@ public class TestDFSStripedOutputStream {
           cellSize, dataBlockBytes, parityBlockBytes);
     }
   }
-    
+
   static void verifyParity(final long size, final int cellSize,
       byte[][] dataBytes, byte[][] parityBytes) {
     // verify the parity blocks

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11a17e66/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
index 90488c1..b0631ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
@@ -121,7 +121,6 @@ public class TestReadStripedFile {
     }
     DFSStripedInputStream in =
         new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
-    in.setCellSize(CELLSIZE);
     int readSize = BLOCKSIZE;
     byte[] readBuffer = new byte[readSize];
     int ret = in.read(0, readBuffer, 0, readSize);