You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/05/04 19:57:51 UTC
[17/50] hadoop git commit: HDFS-7889 Subclass DFSOutputStream to
support writing striping layout files. Contributed by Li Bo
HDFS-7889 Subclass DFSOutputStream to support writing striping layout files. Contributed by Li Bo
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/16d6f9ac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/16d6f9ac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/16d6f9ac
Branch: refs/heads/HDFS-7285
Commit: 16d6f9ac9dfc3c6292421ca45f3e9bc796b57299
Parents: 41128e9
Author: Kai Zheng <ka...@intel.com>
Authored: Sat Apr 11 01:03:37 2015 +0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon May 4 10:13:23 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 4 +-
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 13 +-
.../java/org/apache/hadoop/hdfs/DFSPacket.java | 26 +-
.../hadoop/hdfs/DFSStripedOutputStream.java | 439 +++++++++++++++++++
.../org/apache/hadoop/hdfs/DataStreamer.java | 11 +-
.../apache/hadoop/hdfs/StripedDataStreamer.java | 241 ++++++++++
.../hadoop/hdfs/TestDFSStripedOutputStream.java | 311 +++++++++++++
7 files changed, 1031 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d6f9ac/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 1e695c4..753795a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -56,4 +56,6 @@
HDFS-8074. Define a system-wide default EC schema. (Kai Zheng)
- HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng)
\ No newline at end of file
+ HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng)
+
+ HDFS-7889. Subclass DFSOutputStream to support writing striping layout files. (Li Bo via Kai Zheng)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d6f9ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index ae5d3eb..0280d71 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -266,8 +266,14 @@ public class DFSOutputStream extends FSOutputSummer
}
}
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
- final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
- flag, progress, checksum, favoredNodes);
+ final DFSOutputStream out;
+ if(stat.getReplication() == 0) {
+ out = new DFSStripedOutputStream(dfsClient, src, stat,
+ flag, progress, checksum, favoredNodes);
+ } else {
+ out = new DFSOutputStream(dfsClient, src, stat,
+ flag, progress, checksum, favoredNodes);
+ }
out.start();
return out;
} finally {
@@ -347,6 +353,9 @@ public class DFSOutputStream extends FSOutputSummer
String[] favoredNodes) throws IOException {
TraceScope scope =
dfsClient.getPathTraceScope("newStreamForAppend", src);
+ if(stat.getReplication() == 0) {
+ throw new IOException("Not support appending to a striping layout file yet.");
+ }
try {
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
progress, lastBlock, stat, checksum, favoredNodes);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d6f9ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
index 22055c3..9cd1ec1 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
@@ -113,6 +114,19 @@ class DFSPacket {
dataPos += len;
}
+ synchronized void writeData(ByteBuffer inBuffer, int len)
+ throws ClosedChannelException {
+ checkBuffer();
+ len = len > inBuffer.remaining() ? inBuffer.remaining() : len;
+ if (dataPos + len > buf.length) {
+ throw new BufferOverflowException();
+ }
+ for (int i = 0; i < len; i++) {
+ buf[dataPos + i] = inBuffer.get();
+ }
+ dataPos += len;
+ }
+
/**
* Write checksums to this packet
*
@@ -222,7 +236,7 @@ class DFSPacket {
*
* @return true if the packet is the last packet
*/
- boolean isLastPacketInBlock(){
+ boolean isLastPacketInBlock() {
return lastPacketInBlock;
}
@@ -231,7 +245,7 @@ class DFSPacket {
*
* @return the sequence number of this packet
*/
- long getSeqno(){
+ long getSeqno() {
return seqno;
}
@@ -240,14 +254,14 @@ class DFSPacket {
*
* @return the number of chunks in this packet
*/
- synchronized int getNumChunks(){
+ synchronized int getNumChunks() {
return numChunks;
}
/**
* increase the number of chunks by one
*/
- synchronized void incNumChunks(){
+ synchronized void incNumChunks() {
numChunks++;
}
@@ -256,7 +270,7 @@ class DFSPacket {
*
* @return the maximum number of packets
*/
- int getMaxChunks(){
+ int getMaxChunks() {
return maxChunks;
}
@@ -265,7 +279,7 @@ class DFSPacket {
*
* @param syncBlock if to sync block
*/
- synchronized void setSyncBlock(boolean syncBlock){
+ synchronized void setSyncBlock(boolean syncBlock) {
this.syncBlock = syncBlock;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d6f9ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
new file mode 100644
index 0000000..aded4fe
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+
+/****************************************************************
+ * The DFSStripedOutputStream class supports writing files in striped
+ * layout. Each stripe contains a sequence of cells and multiple
+ * {@link StripedDataStreamer}s in DFSStripedOutputStream are responsible
+ * for writing the cells to different datanodes.
+ *
+ ****************************************************************/
+
+@InterfaceAudience.Private
+public class DFSStripedOutputStream extends DFSOutputStream {
+
+ private final List<StripedDataStreamer> streamers;
+ /**
+ * Size of each striping cell, must be a multiple of bytesPerChecksum
+ */
+ private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+ private ByteBuffer[] cellBuffers;
+ private final short blockGroupBlocks = HdfsConstants.NUM_DATA_BLOCKS
+ + HdfsConstants.NUM_PARITY_BLOCKS;
+ private final short blockGroupDataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+ private int curIdx = 0;
+ /* bytes written in current block group */
+ private long currentBlockGroupBytes = 0;
+
+ //TODO: Use ErasureCoder interface (HDFS-7781)
+ private RawErasureEncoder encoder;
+
+ private StripedDataStreamer getLeadingStreamer() {
+ return streamers.get(0);
+ }
+
+ private long getBlockGroupSize() {
+ return blockSize * HdfsConstants.NUM_DATA_BLOCKS;
+ }
+
+ /** Construct a new output stream for creating a file. */
+ DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
+ EnumSet<CreateFlag> flag, Progressable progress,
+ DataChecksum checksum, String[] favoredNodes)
+ throws IOException {
+ super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
+ DFSClient.LOG.info("Creating striped output stream");
+ if (blockGroupBlocks <= 1) {
+ throw new IOException("The block group must contain more than one block.");
+ }
+
+ cellBuffers = new ByteBuffer[blockGroupBlocks];
+ List<BlockingQueue<LocatedBlock>> stripeBlocks = new ArrayList<>();
+
+ for (int i = 0; i < blockGroupBlocks; i++) {
+ stripeBlocks.add(new LinkedBlockingQueue<LocatedBlock>(blockGroupBlocks));
+ try {
+ cellBuffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
+ } catch (InterruptedException ie) {
+ final InterruptedIOException iioe = new InterruptedIOException(
+ "create cell buffers");
+ iioe.initCause(ie);
+ throw iioe;
+ }
+ }
+ encoder = new RSRawEncoder();
+ encoder.initialize(blockGroupDataBlocks,
+ blockGroupBlocks - blockGroupDataBlocks, cellSize);
+
+ streamers = new ArrayList<>(blockGroupBlocks);
+ for (short i = 0; i < blockGroupBlocks; i++) {
+ StripedDataStreamer streamer = new StripedDataStreamer(stat, null,
+ dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
+ i, stripeBlocks);
+ if (favoredNodes != null && favoredNodes.length != 0) {
+ streamer.setFavoredNodes(favoredNodes);
+ }
+ streamers.add(streamer);
+ }
+
+ refreshStreamer();
+ }
+
+ private void refreshStreamer() {
+ streamer = streamers.get(curIdx);
+ }
+
+ private void moveToNextStreamer() {
+ curIdx = (curIdx + 1) % blockGroupBlocks;
+ refreshStreamer();
+ }
+
+ /**
+ * encode the buffers.
+ * After encoding, flip each buffer.
+ *
+ * @param buffers data buffers + parity buffers
+ */
+ private void encode(ByteBuffer[] buffers) {
+ ByteBuffer[] dataBuffers = new ByteBuffer[blockGroupDataBlocks];
+ ByteBuffer[] parityBuffers = new ByteBuffer[blockGroupBlocks - blockGroupDataBlocks];
+ for (int i = 0; i < blockGroupBlocks; i++) {
+ if (i < blockGroupDataBlocks) {
+ dataBuffers[i] = buffers[i];
+ } else {
+ parityBuffers[i - blockGroupDataBlocks] = buffers[i];
+ }
+ }
+ encoder.encode(dataBuffers, parityBuffers);
+ }
+
+ /**
+ * Generate packets from a given buffer
+ *
+ * @param byteBuffer the given buffer to generate packets
+ * @return packets generated
+ * @throws IOException
+ */
+ private List<DFSPacket> generatePackets(ByteBuffer byteBuffer)
+ throws IOException{
+ List<DFSPacket> packets = new ArrayList<>();
+ while (byteBuffer.remaining() > 0) {
+ DFSPacket p = createPacket(packetSize, chunksPerPacket,
+ streamer.getBytesCurBlock(),
+ streamer.getAndIncCurrentSeqno(), false);
+ int maxBytesToPacket = p.getMaxChunks() * bytesPerChecksum;
+ int toWrite = byteBuffer.remaining() > maxBytesToPacket ?
+ maxBytesToPacket: byteBuffer.remaining();
+ p.writeData(byteBuffer, toWrite);
+ streamer.incBytesCurBlock(toWrite);
+ packets.add(p);
+ }
+ return packets;
+ }
+
+ @Override
+ protected synchronized void writeChunk(byte[] b, int offset, int len,
+ byte[] checksum, int ckoff, int cklen) throws IOException {
+ super.writeChunk(b, offset, len, checksum, ckoff, cklen);
+
+ if (getSizeOfCellnBuffer(curIdx) <= cellSize) {
+ addToCellBuffer(b, offset, len);
+ } else {
+ String msg = "Writing a chunk should not overflow the cell buffer.";
+ DFSClient.LOG.info(msg);
+ throw new IOException(msg);
+ }
+
+
+ // If current packet has not been enqueued for transmission,
+ // but the cell buffer is full, we need to enqueue the packet
+ if (currentPacket != null && getSizeOfCellnBuffer(curIdx) == cellSize) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("DFSClient writeChunk cell buffer full seqno=" +
+ currentPacket.getSeqno() +
+ ", curIdx=" + curIdx +
+ ", src=" + src +
+ ", bytesCurBlock=" + streamer.getBytesCurBlock() +
+ ", blockSize=" + blockSize +
+ ", appendChunk=" + streamer.getAppendChunk());
+ }
+ streamer.waitAndQueuePacket(currentPacket);
+ currentPacket = null;
+ adjustChunkBoundary();
+ endBlock();
+ }
+
+ // Two extra steps are needed when a striping cell is full:
+ // 1. Forward the current index pointer
+ // 2. Generate parity packets if a full stripe of data cells are present
+ if (getSizeOfCellnBuffer(curIdx) == cellSize) {
+ //move curIdx to next cell
+ moveToNextStreamer();
+ //When all data cells in a stripe are ready, we need to encode
+ //them and generate some parity cells. These cells will be
+ //converted to packets and put to their DataStreamer's queue.
+ if (curIdx == blockGroupDataBlocks) {
+ //encode the data cells
+ for (int k = 0; k < blockGroupDataBlocks; k++) {
+ cellBuffers[k].flip();
+ }
+ encode(cellBuffers);
+ for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) {
+ ByteBuffer parityBuffer = cellBuffers[i];
+ List<DFSPacket> packets = generatePackets(parityBuffer);
+ for (DFSPacket p : packets) {
+ currentPacket = p;
+ streamer.waitAndQueuePacket(currentPacket);
+ currentPacket = null;
+ }
+ endBlock();
+ moveToNextStreamer();
+ }
+ //read next stripe to cellBuffers
+ clearCellBuffers();
+ }
+ }
+ }
+
+ private void addToCellBuffer(byte[] b, int off, int len) {
+ cellBuffers[curIdx].put(b, off, len);
+ }
+
+ private int getSizeOfCellnBuffer(int cellIndex) {
+ return cellBuffers[cellIndex].position();
+ }
+
+ private void clearCellBuffers() {
+ for (int i = 0; i< blockGroupBlocks; i++) {
+ cellBuffers[i].clear();
+ }
+ }
+
+ private int stripeDataSize() {
+ return blockGroupDataBlocks * cellSize;
+ }
+
+ private void notSupported(String headMsg)
+ throws IOException{
+ throw new IOException(
+ headMsg + " is now not supported for striping layout.");
+ }
+
+ @Override
+ public void hflush() throws IOException {
+ notSupported("hflush");
+ }
+
+ @Override
+ public void hsync() throws IOException {
+ notSupported("hsync");
+ }
+
+
+ @Override
+ protected synchronized void start() {
+ for (StripedDataStreamer streamer : streamers) {
+ streamer.start();
+ }
+ }
+
+ @Override
+ synchronized void abort() throws IOException {
+ if (isClosed()) {
+ return;
+ }
+ for (StripedDataStreamer streamer : streamers) {
+ streamer.setLastException(new IOException("Lease timeout of "
+ + (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
+ }
+ closeThreads(true);
+ dfsClient.endFileLease(fileId);
+ }
+
+ //TODO: Handle slow writers (HDFS-7786)
+ //Cuurently only check if the leading streamer is terminated
+ boolean isClosed() {
+ return closed || getLeadingStreamer().streamerClosed();
+ }
+
+ // shutdown datastreamer and responseprocessor threads.
+ // interrupt datastreamer if force is true
+ @Override
+ protected void closeThreads(boolean force) throws IOException {
+ StripedDataStreamer leadingStreamer = null;
+ for (StripedDataStreamer streamer : streamers) {
+ try {
+ streamer.close(force);
+ streamer.join();
+ streamer.closeSocket();
+ if (streamer.isLeadingStreamer()) {
+ leadingStreamer = streamer;
+ } else {
+ streamer.countTailingBlockGroupBytes();
+ }
+
+ } catch (InterruptedException e) {
+ throw new IOException("Failed to shutdown streamer");
+ } finally {
+ streamer.setSocketToNull();
+ setClosed();
+ }
+ }
+ leadingStreamer.countTailingBlockGroupBytes();
+ }
+
+ @Override
+ public synchronized void write(int b) throws IOException {
+ super.write(b);
+ currentBlockGroupBytes = (currentBlockGroupBytes + 1) % getBlockGroupSize();
+ }
+
+ @Override
+ public synchronized void write(byte b[], int off, int len)
+ throws IOException {
+ super.write(b, off, len);
+ currentBlockGroupBytes = (currentBlockGroupBytes + len) % getBlockGroupSize();
+ }
+
+ private void writeParityCellsForLastStripe() throws IOException{
+ if(currentBlockGroupBytes == 0 ||
+ currentBlockGroupBytes % stripeDataSize() == 0)
+ return;
+ int lastStripeLen =(int)(currentBlockGroupBytes % stripeDataSize());
+ // Size of parity cells should equal the size of the first cell, if it
+ // is not full.
+ int parityCellSize = cellSize;
+ int index = lastStripeLen / cellSize;
+ if (lastStripeLen < cellSize) {
+ parityCellSize = lastStripeLen;
+ index++;
+ }
+ for (int i = 0; i < blockGroupBlocks; i++) {
+ if (i >= index) {
+ int position = cellBuffers[i].position();
+ for (int j = 0; j < parityCellSize - position; j++) {
+ cellBuffers[i].put((byte)0);
+ }
+ }
+ cellBuffers[i].flip();
+ }
+ encode(cellBuffers);
+
+ //write parity cells
+ curIdx = blockGroupDataBlocks;
+ refreshStreamer();
+ for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) {
+ ByteBuffer parityBuffer = cellBuffers[i];
+ List<DFSPacket> packets = generatePackets(parityBuffer);
+ for (DFSPacket p : packets) {
+ currentPacket = p;
+ streamer.waitAndQueuePacket(currentPacket);
+ currentPacket = null;
+ }
+ endBlock();
+ moveToNextStreamer();
+ }
+
+ clearCellBuffers();
+ }
+
+ @Override
+ void setClosed() {
+ super.setClosed();
+ for (int i = 0; i < blockGroupBlocks; i++) {
+ byteArrayManager.release(cellBuffers[i].array());
+ streamers.get(i).release();
+ }
+ }
+
+ @Override
+ protected synchronized void closeImpl() throws IOException {
+ if (isClosed()) {
+ IOException e = getLeadingStreamer().getLastException().getAndSet(null);
+ if (e == null)
+ return;
+ else
+ throw e;
+ }
+
+ try {
+ // flush from all upper layers
+ flushBuffer();
+ if (currentPacket != null) {
+ streamer.waitAndQueuePacket(currentPacket);
+ currentPacket = null;
+ }
+ //if the last stripe is incomplete, generate and write parity cells
+ writeParityCellsForLastStripe();
+
+ for (int i = 0; i < blockGroupBlocks; i++) {
+ curIdx = i;
+ refreshStreamer();
+ if (streamer.getBytesCurBlock()!= 0 ||
+ currentBlockGroupBytes < getBlockGroupSize()) {
+ // send an empty packet to mark the end of the block
+ currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
+ streamer.getAndIncCurrentSeqno(), true);
+ currentPacket.setSyncBlock(shouldSyncBlock);
+ }
+ // flush all data to Datanode
+ flushInternal();
+ }
+
+ // get last block before destroying the streamer
+ ExtendedBlock lastBlock = streamers.get(0).getBlock();
+ closeThreads(false);
+ TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
+ try {
+ completeFile(lastBlock);
+ } finally {
+ scope.close();
+ }
+ dfsClient.endFileLease(fileId);
+ } catch (ClosedChannelException e) {
+ } finally {
+ setClosed();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d6f9ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 43787ab..0f5c54e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -210,7 +210,7 @@ class DataStreamer extends Daemon {
}
private volatile boolean streamerClosed = false;
- private ExtendedBlock block; // its length is number of bytes acked
+ protected ExtendedBlock block; // its length is number of bytes acked
private Token<BlockTokenIdentifier> accessToken;
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
@@ -218,6 +218,7 @@ class DataStreamer extends Daemon {
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
private volatile StorageType[] storageTypes = null;
private volatile String[] storageIDs = null;
+ protected String[] favoredNodes;
volatile boolean hasError = false;
volatile int errorIndex = -1;
// Restarting node index
@@ -244,12 +245,12 @@ class DataStreamer extends Daemon {
private final LastExceptionInStreamer lastException = new LastExceptionInStreamer();
private Socket s;
- private final DFSClient dfsClient;
- private final String src;
+ protected final DFSClient dfsClient;
+ protected final String src;
/** Only for DataTransferProtocol.writeBlock(..) */
private final DataChecksum checksum4WriteBlock;
private final Progressable progress;
- private final HdfsFileStatus stat;
+ protected final HdfsFileStatus stat;
// appending to existing partial block
private volatile boolean appendChunk = false;
// both dataQueue and ackQueue are protected by dataQueue lock
@@ -365,7 +366,7 @@ class DataStreamer extends Daemon {
stage = BlockConstructionStage.DATA_STREAMING;
}
- private void endBlock() {
+ protected void endBlock() {
if(LOG.isDebugEnabled()) {
LOG.debug("Closing old block " + block);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d6f9ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
new file mode 100644
index 0000000..710d92d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.List;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.util.ByteArrayManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/****************************************************************************
+ * The StripedDataStreamer class is used by {@link DFSStripedOutputStream}.
+ * There are two kinds of StripedDataStreamer, leading streamer and ordinary
+ * stream. Leading streamer requests a block group from NameNode, unwraps
+ * it to located blocks and transfers each located block to its corresponding
+ * ordinary streamer via a blocking queue.
+ *
+ ****************************************************************************/
+public class StripedDataStreamer extends DataStreamer {
+ private final short index;
+ private final List<BlockingQueue<LocatedBlock>> stripedBlocks;
+ private static short blockGroupSize = HdfsConstants.NUM_DATA_BLOCKS
+ + HdfsConstants.NUM_PARITY_BLOCKS;
+ private boolean hasCommittedBlock = false;
+
+ StripedDataStreamer(HdfsFileStatus stat, ExtendedBlock block,
+ DFSClient dfsClient, String src,
+ Progressable progress, DataChecksum checksum,
+ AtomicReference<CachingStrategy> cachingStrategy,
+ ByteArrayManager byteArrayManage, short index,
+ List<BlockingQueue<LocatedBlock>> stripedBlocks) {
+ super(stat,block, dfsClient, src, progress, checksum, cachingStrategy,
+ byteArrayManage);
+ this.index = index;
+ this.stripedBlocks = stripedBlocks;
+ }
+
+ /**
+ * Construct a data streamer for appending to the last partial block
+ * @param lastBlock last block of the file to be appended
+ * @param stat status of the file to be appended
+ * @throws IOException if error occurs
+ */
+ StripedDataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
+ DFSClient dfsClient, String src,
+ Progressable progress, DataChecksum checksum,
+ AtomicReference<CachingStrategy> cachingStrategy,
+ ByteArrayManager byteArrayManage, short index,
+ List<BlockingQueue<LocatedBlock>> stripedBlocks)
+ throws IOException {
+ super(lastBlock, stat, dfsClient, src, progress, checksum, cachingStrategy,
+ byteArrayManage);
+ this.index = index;
+ this.stripedBlocks = stripedBlocks;
+ }
+
+ public boolean isLeadingStreamer () {
+ return index == 0;
+ }
+
+ private boolean isParityStreamer() {
+ return index >= HdfsConstants.NUM_DATA_BLOCKS;
+ }
+
+ @Override
+ protected void endBlock() {
+ if (!isLeadingStreamer() && !isParityStreamer()) {
+ //before retrieving a new block, transfer the finished block to
+ //leading streamer
+ LocatedBlock finishedBlock = new LocatedBlock(
+ new ExtendedBlock(block.getBlockPoolId(), block.getBlockId(),
+ block.getNumBytes(),block.getGenerationStamp()), null);
+ try{
+ boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
+ TimeUnit.SECONDS);
+ }catch (InterruptedException ie) {
+ //TODO: Handle InterruptedException (HDFS-7786)
+ }
+ }
+ super.endBlock();
+ }
+
+ /**
+ * This function is called after the streamer is closed.
+ */
+ void countTailingBlockGroupBytes () throws IOException {
+ if (isLeadingStreamer()) {
+ //when committing a block group, leading streamer has to adjust
+ // {@link block} including the size of block group
+ for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
+ try {
+ LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
+ TimeUnit.SECONDS);
+ if (finishedLocatedBlock == null) {
+ throw new IOException("Fail to get finished LocatedBlock " +
+ "from streamer, i=" + i);
+ }
+ ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
+ long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
+ if (block != null) {
+ block.setNumBytes(block.getNumBytes() + bytes);
+ }
+ } catch (InterruptedException ie) {
+ DFSClient.LOG.info("InterruptedException received when " +
+ "putting a block to stripeBlocks, ie = " + ie);
+ }
+ }
+ } else if (!isParityStreamer()) {
+ if (block == null || block.getNumBytes() == 0) {
+ LocatedBlock finishedBlock = new LocatedBlock(null, null);
+ try {
+ boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
+ TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ //TODO: Handle InterruptedException (HDFS-7786)
+ ie.printStackTrace();
+ }
+ }
+ }
+
+ }
+
+ @Override
+ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
+ throws IOException {
+ LocatedBlock lb = null;
+ if (isLeadingStreamer()) {
+ if(hasCommittedBlock) {
+ //when committing a block group, leading streamer has to adjust
+ // {@link block} including the size of block group
+ for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
+ try {
+ LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
+ TimeUnit.SECONDS);
+ if (finishedLocatedBlock == null) {
+ throw new IOException("Fail to get finished LocatedBlock " +
+ "from streamer, i=" + i);
+ }
+ ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
+ long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
+ if(block != null) {
+ block.setNumBytes(block.getNumBytes() + bytes);
+ }
+ } catch (InterruptedException ie) {
+ DFSClient.LOG.info("InterruptedException received when putting" +
+ " a block to stripeBlocks, ie = " + ie);
+ }
+ }
+ }
+
+ lb = super.locateFollowingBlock(excludedNodes);
+ hasCommittedBlock = true;
+ LocatedBlock[] blocks = unwrapBlockGroup(lb);
+ assert blocks.length == blockGroupSize :
+ "Fail to get block group from namenode: blockGroupSize: " +
+ blockGroupSize + ", blocks.length: " + blocks.length;
+ lb = blocks[0];
+ for (int i = 1; i < blocks.length; i++) {
+ try {
+ boolean offSuccess = stripedBlocks.get(i).offer(blocks[i],
+ 90, TimeUnit.SECONDS);
+ if(!offSuccess){
+ String msg = "Fail to put block to stripeBlocks. i = " + i;
+ DFSClient.LOG.info(msg);
+ throw new IOException(msg);
+ } else {
+ DFSClient.LOG.info("Allocate a new block to a streamer. i = " + i
+ + ", block: " + blocks[i]);
+ }
+ } catch (InterruptedException ie) {
+ DFSClient.LOG.info("InterruptedException received when putting" +
+ " a block to stripeBlocks, ie = " + ie);
+ }
+ }
+ } else {
+ try {
+ //wait 90 seconds to get a block from the queue
+ lb = stripedBlocks.get(index).poll(90, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ DFSClient.LOG.info("InterruptedException received when retrieving " +
+ "a block from stripeBlocks, ie = " + ie);
+ }
+ }
+ return lb;
+ }
+
+ /**
+ * Generate other blocks in a block group according to the first one.
+ *
+ * @param firstBlockInGroup the first block in a block group
+ * @return other blocks in this group
+ */
+ public static LocatedBlock[] unwrapBlockGroup(
+ final LocatedBlock firstBlockInGroup) {
+ ExtendedBlock eb = firstBlockInGroup.getBlock();
+ DatanodeInfo[] locs = firstBlockInGroup.getLocations();
+ String[] storageIDs = firstBlockInGroup.getStorageIDs();
+ StorageType[] storageTypes = firstBlockInGroup.getStorageTypes();
+ Token<BlockTokenIdentifier> blockToken = firstBlockInGroup.getBlockToken();
+ LocatedBlock[] blocksInGroup = new LocatedBlock[locs.length];
+ for (int i = 0; i < blocksInGroup.length; i++) {
+ //each block in a group has the same number of bytes and timestamp
+ ExtendedBlock extendedBlock = new ExtendedBlock(eb.getBlockPoolId(),
+ eb.getBlockId() + i, eb.getNumBytes(), eb.getGenerationStamp());
+ blocksInGroup[i] = new LocatedBlock(extendedBlock,
+ new DatanodeInfo[] {locs[i]}, new String[]{storageIDs[i]},
+ new StorageType[] {storageTypes[i]});
+ blocksInGroup[i].setBlockToken(blockToken);
+ }
+ return blocksInGroup;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d6f9ac/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
new file mode 100644
index 0000000..f5a37f3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -0,0 +1,311 @@
+package org.apache.hadoop.hdfs;
+
+import java.util.Arrays;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestDFSStripedOutputStream {
+ private int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+ private int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+
+ private MiniDFSCluster cluster;
+ private Configuration conf = new Configuration();
+ private DistributedFileSystem fs;
+ int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+ int blockSize = 8 * 1024 * 1024;
+ int cellsInBlock = blockSize / cellSize;
+ private int mod = 29;
+
+ @Before
+ public void setup() throws IOException {
+ int numDNs = dataBlocks + parityBlocks + 2;
+ Configuration conf = new Configuration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, cellsInBlock * cellSize);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+ cluster.getFileSystem().getClient().createErasureCodingZone("/");
+ fs = cluster.getFileSystem();
+ }
+
+ @After
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void TestFileEmpty() throws IOException {
+ testOneFile("/EmptyFile", 0);
+ }
+
+ @Test
+ public void TestFileSmallerThanOneCell1() throws IOException {
+ testOneFile("/SmallerThanOneCell", 1);
+ }
+
+ @Test
+ public void TestFileSmallerThanOneCell2() throws IOException {
+ testOneFile("/SmallerThanOneCell", cellSize - 1);
+ }
+
+ @Test
+ public void TestFileEqualsWithOneCell() throws IOException {
+ testOneFile("/EqualsWithOneCell", cellSize);
+ }
+
+ @Test
+ public void TestFileSmallerThanOneStripe1() throws IOException {
+ testOneFile("/SmallerThanOneStripe", cellSize * dataBlocks - 1);
+ }
+
+ @Test
+ public void TestFileSmallerThanOneStripe2() throws IOException {
+ testOneFile("/SmallerThanOneStripe", cellSize + 123);
+ }
+
+ @Test
+ public void TestFileEqualsWithOneStripe() throws IOException {
+ testOneFile("/EqualsWithOneStripe", cellSize * dataBlocks);
+ }
+
+ @Test
+ public void TestFileMoreThanOneStripe1() throws IOException {
+ testOneFile("/MoreThanOneStripe1", cellSize * dataBlocks + 123);
+ }
+
+ @Test
+ public void TestFileMoreThanOneStripe2() throws IOException {
+ testOneFile("/MoreThanOneStripe2",
+ cellSize * dataBlocks * (cellsInBlock >= 2 ? cellsInBlock / 2 : 1)
+ + cellSize * dataBlocks + 123);
+ }
+
+ @Test
+ public void TestFileFullBlockGroup() throws IOException {
+ testOneFile("/FullBlockGroup", blockSize * dataBlocks);
+ }
+
+ //TODO: The following tests will pass after HDFS-8121 fixed
+// @Test
+ public void TestFileMoreThanABlockGroup1() throws IOException {
+ testOneFile("/MoreThanABlockGroup1", blockSize * dataBlocks + 123);
+ }
+
+ // @Test
+ public void TestFileMoreThanABlockGroup2() throws IOException {
+ testOneFile("/MoreThanABlockGroup2",
+ blockSize * dataBlocks * 3
+ + (cellsInBlock >= 2 ? cellsInBlock / 2 : 1) * cellSize * dataBlocks
+ + 123);
+ }
+
+ private int stripeDataSize() {
+ return cellSize * dataBlocks;
+ }
+
+ private byte[] generateBytes(int cnt) {
+ byte[] bytes = new byte[cnt];
+ for (int i = 0; i < cnt; i++) {
+ bytes[i] = getByte(i);
+ }
+ return bytes;
+ }
+
+ private byte getByte(long pos) {
+ return (byte) (pos % mod + 1);
+ }
+
+ private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
+ throws IOException {
+ Path TestPath = new Path(src);
+ byte[] bytes = generateBytes(writeBytes);
+ DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
+
+ //check file length
+ FileStatus status = fs.getFileStatus(TestPath);
+ long fileLength = status.getLen();
+ if (fileLength != writeBytes) {
+ Assert.fail("File Length error: expect=" + writeBytes
+ + ", actual=" + fileLength);
+ }
+
+ DFSStripedInputStream dis = new DFSStripedInputStream(
+ fs.getClient(), src, true);
+ byte[] buf = new byte[writeBytes + 100];
+ int readLen = dis.read(0, buf, 0, buf.length);
+ readLen = readLen >= 0 ? readLen : 0;
+ if (readLen != writeBytes) {
+ Assert.fail("The length of file is not correct.");
+ }
+
+ for (int i = 0; i < writeBytes; i++) {
+ if (getByte(i) != buf[i]) {
+ Assert.fail("Byte at i = " + i + " is wrongly written.");
+ }
+ }
+ }
+
+ private void testOneFile(String src, int writeBytes)
+ throws IOException {
+ Path TestPath = new Path(src);
+
+ int allBlocks = dataBlocks + parityBlocks;
+ byte[] bytes = generateBytes(writeBytes);
+ DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
+
+ //check file length
+ FileStatus status = fs.getFileStatus(TestPath);
+ long fileLength = status.getLen();
+ if (fileLength != writeBytes) {
+ Assert.fail("File Length error: expect=" + writeBytes
+ + ", actual=" + fileLength);
+ }
+
+ List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
+ LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
+
+ for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
+ LocatedBlock[] blocks = StripedDataStreamer.unwrapBlockGroup(firstBlock);
+ List<LocatedBlock> oneGroup = Arrays.asList(blocks);
+ blockGroupList.add(oneGroup);
+ }
+
+ //test each block group
+ for (int group = 0; group < blockGroupList.size(); group++) {
+ //get the data of this block
+ List<LocatedBlock> blockList = blockGroupList.get(group);
+ byte[][] dataBlockBytes = new byte[dataBlocks][];
+ byte[][] parityBlockBytes = new byte[allBlocks - dataBlocks][];
+
+ //calculate the size of this block group
+ int lenOfBlockGroup = group < blockGroupList.size() - 1 ?
+ blockSize * dataBlocks :
+ writeBytes - blockSize * (blockGroupList.size() - 1) * dataBlocks;
+ int intactStripes = lenOfBlockGroup / stripeDataSize();
+ int lastStripeLen = lenOfBlockGroup % stripeDataSize();
+
+ //for each block, use BlockReader to read data
+ for (int i = 0; i < blockList.size(); i++) {
+ LocatedBlock lblock = blockList.get(i);
+ if (lblock == null) {
+ continue;
+ }
+ DatanodeInfo[] nodes = lblock.getLocations();
+ ExtendedBlock block = lblock.getBlock();
+ InetSocketAddress targetAddr = NetUtils.createSocketAddr(
+ nodes[0].getXferAddr());
+
+ int lenOfCell = cellSize;
+ if (i == lastStripeLen / cellSize) {
+ lenOfCell = lastStripeLen % cellSize;
+ } else if (i > lastStripeLen / cellSize) {
+ lenOfCell = 0;
+ }
+ int lenOfBlock = cellSize * intactStripes + lenOfCell;
+ byte[] blockBytes = new byte[lenOfBlock];
+ if (i < dataBlocks) {
+ dataBlockBytes[i] = blockBytes;
+ } else {
+ parityBlockBytes[i - dataBlocks] = blockBytes;
+ }
+
+ if (lenOfBlock == 0) {
+ continue;
+ }
+
+ block.setNumBytes(lenOfBlock);
+ BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)).
+ setFileName(src).
+ setBlock(block).
+ setBlockToken(lblock.getBlockToken()).
+ setInetSocketAddress(targetAddr).
+ setStartOffset(0).
+ setLength(block.getNumBytes()).
+ setVerifyChecksum(true).
+ setClientName("TestStripeLayoutWrite").
+ setDatanodeInfo(nodes[0]).
+ setCachingStrategy(CachingStrategy.newDefaultStrategy()).
+ setClientCacheContext(ClientContext.getFromConf(conf)).
+ setConfiguration(conf).
+ setRemotePeerFactory(new RemotePeerFactory() {
+ @Override
+ public Peer newConnectedPeer(InetSocketAddress addr,
+ Token<BlockTokenIdentifier> blockToken,
+ DatanodeID datanodeId)
+ throws IOException {
+ Peer peer = null;
+ Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+ try {
+ sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
+ sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
+ peer = TcpPeerServer.peerFromSocket(sock);
+ } finally {
+ if (peer == null) {
+ IOUtils.closeSocket(sock);
+ }
+ }
+ return peer;
+ }
+ }).build();
+
+ blockReader.readAll(blockBytes, 0, lenOfBlock);
+ blockReader.close();
+ }
+
+ //check if we write the data correctly
+ for (int i = 0; i < dataBlockBytes.length; i++) {
+ byte[] cells = dataBlockBytes[i];
+ if (cells == null) {
+ continue;
+ }
+ for (int j = 0; j < cells.length; j++) {
+ byte expected;
+ //calculate the postion of this byte in the file
+ long pos = group * dataBlocks * blockSize
+ + (i * cellSize + j / cellSize * cellSize * dataBlocks)
+ + j % cellSize;
+ if (pos >= writeBytes) {
+ expected = 0;
+ } else {
+ expected = getByte(pos);
+ }
+
+ if (expected != cells[j]) {
+ Assert.fail("Unexpected byte " + cells[j] + ", expect " + expected
+ + ". Block group index is " + group +
+ ", stripe index is " + j / cellSize +
+ ", cell index is " + i + ", byte index is " + j % cellSize);
+ }
+ }
+ }
+ }
+ }
+
+}