You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/17 01:58:39 UTC
[11/50] hadoop git commit: HDFS-8136. Client gets and uses EC schema
when reads and writes a stripping file. Contributed by Kai Sasaki
HDFS-8136. Client gets and uses EC schema when reads and writes a stripping file. Contributed by Kai Sasaki
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/11a17e66
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/11a17e66
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/11a17e66
Branch: refs/heads/HDFS-7285
Commit: 11a17e66e5ffa7592320248870bf3bf82a1d063f
Parents: d24efdf
Author: Kai Zheng <ka...@intel.com>
Authored: Fri Apr 24 00:19:12 2015 +0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Sat May 16 15:15:22 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 +
.../hadoop/hdfs/DFSStripedInputStream.java | 17 +-
.../hadoop/hdfs/DFSStripedOutputStream.java | 24 ++-
.../hadoop/hdfs/TestDFSStripedInputStream.java | 175 +++++++++++++++++++
.../hadoop/hdfs/TestDFSStripedOutputStream.java | 4 +-
.../apache/hadoop/hdfs/TestReadStripedFile.java | 1 -
6 files changed, 209 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/11a17e66/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index b2faac0..8977c46 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -119,3 +119,6 @@
HDFS-8156. Add/implement necessary APIs even we just have the system default
schema. (Kai Zheng via Zhe Zhang)
+
+ HDFS-8136. Client gets and uses EC schema when reads and writes a stripping
+ file. (Kai Sasaki via Kai Zheng)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/11a17e66/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index d597407..d0e2b68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -21,9 +21,9 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.net.NetUtils;
@@ -125,13 +125,19 @@ public class DFSStripedInputStream extends DFSInputStream {
return results;
}
- private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
- private final short dataBlkNum = HdfsConstants.NUM_DATA_BLOCKS;
- private final short parityBlkNum = HdfsConstants.NUM_PARITY_BLOCKS;
+ private final int cellSize;
+ private final short dataBlkNum;
+ private final short parityBlkNum;
+ private final ECInfo ecInfo;
DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum)
throws IOException {
super(dfsClient, src, verifyChecksum);
+ // ECInfo is restored from NN just before reading striped file.
+ ecInfo = dfsClient.getErasureCodingInfo(src);
+ cellSize = ecInfo.getSchema().getChunkSize();
+ dataBlkNum = (short)ecInfo.getSchema().getNumDataUnits();
+ parityBlkNum = (short)ecInfo.getSchema().getNumParityUnits();
DFSClient.LOG.debug("Creating an striped input stream for file " + src);
}
@@ -279,9 +285,6 @@ public class DFSStripedInputStream extends DFSInputStream {
throw new InterruptedException("let's retry");
}
- public void setCellSize(int cellSize) {
- this.cellSize = cellSize;
- }
/**
* This class represents the portion of I/O associated with each block in the
http://git-wip-us.apache.org/repos/asf/hadoop/blob/11a17e66/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index 7dc0091..eeb9d7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -32,8 +32,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
@@ -61,11 +61,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
/**
* Size of each striping cell, must be a multiple of bytesPerChecksum
*/
- private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+ private final ECInfo ecInfo;
+ private final int cellSize;
private ByteBuffer[] cellBuffers;
- private final short numAllBlocks = HdfsConstants.NUM_DATA_BLOCKS
- + HdfsConstants.NUM_PARITY_BLOCKS;
- private final short numDataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+
+ private final short numAllBlocks;
+ private final short numDataBlocks;
+
private int curIdx = 0;
/* bytes written in current block group */
//private long currentBlockGroupBytes = 0;
@@ -77,6 +79,10 @@ public class DFSStripedOutputStream extends DFSOutputStream {
return streamers.get(0);
}
+ private long getBlockGroupSize() {
+ return blockSize * numDataBlocks;
+ }
+
/** Construct a new output stream for creating a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
@@ -84,6 +90,14 @@ public class DFSStripedOutputStream extends DFSOutputStream {
throws IOException {
super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
DFSClient.LOG.info("Creating striped output stream");
+
+ // ECInfo is restored from NN just before writing striped files.
+ ecInfo = dfsClient.getErasureCodingInfo(src);
+ cellSize = ecInfo.getSchema().getChunkSize();
+ numAllBlocks = (short)(ecInfo.getSchema().getNumDataUnits()
+ + ecInfo.getSchema().getNumParityUnits());
+ numDataBlocks = (short)ecInfo.getSchema().getNumDataUnits();
+
checkConfiguration();
cellBuffers = new ByteBuffer[numAllBlocks];
http://git-wip-us.apache.org/repos/asf/hadoop/blob/11a17e66/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
new file mode 100644
index 0000000..6af4a7f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestDFSStripedInputStream {
+ private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+ private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+
+
+ private static DistributedFileSystem fs;
+ private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+ private final static int stripesPerBlock = 4;
+ static int blockSize = cellSize * stripesPerBlock;
+ private int mod = 29;
+ static int numDNs = dataBlocks + parityBlocks + 2;
+
+ private static MiniDFSCluster cluster;
+ private static Configuration conf;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ conf = new Configuration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+ cluster
+ = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();;
+ cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
+ fs = cluster.getFileSystem();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testFileEmpty() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/EmptyFile", 0);
+ }
+
+ @Test
+ public void testFileSmallerThanOneCell1() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", 1);
+ }
+
+ @Test
+ public void testFileSmallerThanOneCell2() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", cellSize - 1);
+ }
+
+ @Test
+ public void testFileEqualsWithOneCell() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell", cellSize);
+ }
+
+ @Test
+ public void testFileSmallerThanOneStripe1() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", cellSize * dataBlocks - 1);
+ }
+
+ @Test
+ public void testFileSmallerThanOneStripe2() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", cellSize + 123);
+ }
+
+ @Test
+ public void testFileEqualsWithOneStripe() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe", cellSize * dataBlocks);
+ }
+
+ @Test
+ public void testFileMoreThanOneStripe1() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1", cellSize * dataBlocks + 123);
+ }
+
+ @Test
+ public void testFileMoreThanOneStripe2() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2", cellSize * dataBlocks
+ + cellSize * dataBlocks + 123);
+ }
+
+ @Test
+ public void testFileFullBlockGroup() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/FullBlockGroup", blockSize * dataBlocks);
+ }
+
+ @Test
+ public void testFileMoreThanABlockGroup1() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1", blockSize * dataBlocks + 123);
+ }
+
+ @Test
+ public void testFileMoreThanABlockGroup2() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2", blockSize * dataBlocks + cellSize+ 123);
+ }
+
+
+ @Test
+ public void testFileMoreThanABlockGroup3() throws IOException {
+ testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup3",
+ blockSize * dataBlocks * 3 + cellSize * dataBlocks
+ + cellSize + 123);
+ }
+
+ private byte[] generateBytes(int cnt) {
+ byte[] bytes = new byte[cnt];
+ for (int i = 0; i < cnt; i++) {
+ bytes[i] = getByte(i);
+ }
+ return bytes;
+ }
+
+ private byte getByte(long pos) {
+ return (byte) (pos % mod + 1);
+ }
+
+ private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
+ throws IOException {
+ Path TestPath = new Path(src);
+ byte[] bytes = generateBytes(writeBytes);
+ DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
+
+ //check file length
+ FileStatus status = fs.getFileStatus(TestPath);
+ long fileLength = status.getLen();
+ Assert.assertEquals("File length should be the same",
+ writeBytes, fileLength);
+
+ DFSStripedInputStream dis = new DFSStripedInputStream(
+ fs.getClient(), src, true);
+ try {
+ byte[] buf = new byte[writeBytes + 100];
+ int readLen = dis.read(0, buf, 0, buf.length);
+ readLen = readLen >= 0 ? readLen : 0;
+ Assert.assertEquals("The length of file should be the same to write size",
+ writeBytes, readLen);
+ for (int i = 0; i < writeBytes; i++) {
+ Assert.assertEquals("Byte at i should be the same",
+ getByte(i), buf[i]);
+ }
+ } finally {
+ dis.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/11a17e66/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
index c213183..26f6d2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -159,7 +159,7 @@ public class TestDFSStripedOutputStream {
// check file length
FileStatus status = fs.getFileStatus(testPath);
Assert.assertEquals(writeBytes, status.getLen());
-
+
checkData(src, writeBytes);
}
@@ -236,7 +236,7 @@ public class TestDFSStripedOutputStream {
cellSize, dataBlockBytes, parityBlockBytes);
}
}
-
+
static void verifyParity(final long size, final int cellSize,
byte[][] dataBytes, byte[][] parityBytes) {
// verify the parity blocks
http://git-wip-us.apache.org/repos/asf/hadoop/blob/11a17e66/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
index 90488c1..b0631ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
@@ -121,7 +121,6 @@ public class TestReadStripedFile {
}
DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
- in.setCellSize(CELLSIZE);
int readSize = BLOCKSIZE;
byte[] readBuffer = new byte[readSize];
int ret = in.read(0, readBuffer, 0, readSize);