You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/11/03 01:35:24 UTC
svn commit: r1196888 - in
/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/server/datanode/
src/test/java/org/apache/hadoop/hdfs/ src/test/java/org/apache...
Author: todd
Date: Thu Nov 3 00:35:23 2011
New Revision: 1196888
URL: http://svn.apache.org/viewvc?rev=1196888&view=rev
Log:
HDFS-2130. Switch default checksum to CRC32C. Contributed by Todd Lipcon.
Added:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendDifferentChecksum.java
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Nov 3 00:35:23 2011
@@ -789,6 +789,8 @@ Release 0.23.0 - 2011-11-01
HDFS-2465. Add HDFS support for fadvise readahead and drop-behind. (todd)
+ HDFS-2130. Switch default checksum to CRC32C. (todd)
+
BUG FIXES
HDFS-2344. Fix the TestOfflineEditsViewer test failure in 0.23 branch.
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Thu Nov 3 00:35:23 2011
@@ -97,6 +97,7 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
+import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
/********************************************************
@@ -139,6 +140,7 @@ public class DFSClient implements java.i
final int maxBlockAcquireFailures;
final int confTime;
final int ioBufferSize;
+ final int checksumType;
final int bytesPerChecksum;
final int writePacketSize;
final int socketTimeout;
@@ -163,6 +165,7 @@ public class DFSClient implements java.i
ioBufferSize = conf.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+ checksumType = getChecksumType(conf);
bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
DFS_BYTES_PER_CHECKSUM_DEFAULT);
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
@@ -190,6 +193,26 @@ public class DFSClient implements java.i
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
uMask = FsPermission.getUMask(conf);
}
+
+ private int getChecksumType(Configuration conf) {
+ String checksum = conf.get(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY,
+ DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
+ if ("CRC32".equals(checksum)) {
+ return DataChecksum.CHECKSUM_CRC32;
+ } else if ("CRC32C".equals(checksum)) {
+ return DataChecksum.CHECKSUM_CRC32C;
+ } else if ("NULL".equals(checksum)) {
+ return DataChecksum.CHECKSUM_NULL;
+ } else {
+ LOG.warn("Bad checksum type: " + checksum + ". Using default.");
+ return DataChecksum.CHECKSUM_CRC32C;
+ }
+ }
+
+ private DataChecksum createChecksum() {
+ return DataChecksum.newDataChecksum(
+ checksumType, bytesPerChecksum);
+ }
}
Conf getConf() {
@@ -755,7 +778,7 @@ public class DFSClient implements java.i
}
final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag,
createParent, replication, blockSize, progress, buffersize,
- dfsClientConf.bytesPerChecksum);
+ dfsClientConf.createChecksum());
leaserenewer.put(src, result, this);
return result;
}
@@ -799,9 +822,12 @@ public class DFSClient implements java.i
CreateFlag.validate(flag);
DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress);
if (result == null) {
+ DataChecksum checksum = DataChecksum.newDataChecksum(
+ dfsClientConf.checksumType,
+ bytesPerChecksum);
result = new DFSOutputStream(this, src, absPermission,
flag, createParent, replication, blockSize, progress, buffersize,
- bytesPerChecksum);
+ checksum);
}
leaserenewer.put(src, result, this);
return result;
@@ -859,7 +885,7 @@ public class DFSClient implements java.i
UnresolvedPathException.class);
}
return new DFSOutputStream(this, src, buffersize, progress,
- lastBlock, stat, dfsClientConf.bytesPerChecksum);
+ lastBlock, stat, dfsClientConf.createChecksum());
}
/**
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Thu Nov 3 00:35:23 2011
@@ -38,6 +38,8 @@ public class DFSConfigKeys extends Commo
public static final int DFS_STREAM_BUFFER_SIZE_DEFAULT = 4096;
public static final String DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";
public static final int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
+ public static final String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
+ public static final String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Nov 3 00:35:23 2011
@@ -74,7 +74,6 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.PureJavaCrc32;
/****************************************************************
@@ -1206,8 +1205,9 @@ class DFSOutputStream extends FSOutputSu
}
private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progressable progress,
- int bytesPerChecksum, short replication) throws IOException {
- super(new PureJavaCrc32(), bytesPerChecksum, 4);
+ DataChecksum checksum, short replication) throws IOException {
+ super(checksum, checksum.getBytesPerChecksum(), checksum.getChecksumSize());
+ int bytesPerChecksum = checksum.getBytesPerChecksum();
this.dfsClient = dfsClient;
this.src = src;
this.blockSize = blockSize;
@@ -1225,8 +1225,7 @@ class DFSOutputStream extends FSOutputSu
"multiple of io.bytes.per.checksum");
}
- checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
- bytesPerChecksum);
+ this.checksum = checksum;
}
/**
@@ -1235,11 +1234,12 @@ class DFSOutputStream extends FSOutputSu
*/
DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag,
boolean createParent, short replication, long blockSize, Progressable progress,
- int buffersize, int bytesPerChecksum)
+ int buffersize, DataChecksum checksum)
throws IOException {
- this(dfsClient, src, blockSize, progress, bytesPerChecksum, replication);
+ this(dfsClient, src, blockSize, progress, checksum, replication);
- computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
+ computePacketChunkSize(dfsClient.getConf().writePacketSize,
+ checksum.getBytesPerChecksum());
try {
dfsClient.namenode.create(
@@ -1264,8 +1264,8 @@ class DFSOutputStream extends FSOutputSu
*/
DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress,
LocatedBlock lastBlock, HdfsFileStatus stat,
- int bytesPerChecksum) throws IOException {
- this(dfsClient, src, stat.getBlockSize(), progress, bytesPerChecksum, stat.getReplication());
+ DataChecksum checksum) throws IOException {
+ this(dfsClient, src, stat.getBlockSize(), progress, checksum, stat.getReplication());
initialFileSize = stat.getLen(); // length of file when opened
//
@@ -1274,9 +1274,10 @@ class DFSOutputStream extends FSOutputSu
if (lastBlock != null) {
// indicate that we are appending to an existing block
bytesCurBlock = lastBlock.getBlockSize();
- streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
+ streamer = new DataStreamer(lastBlock, stat, checksum.getBytesPerChecksum());
} else {
- computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
+ computePacketChunkSize(dfsClient.getConf().writePacketSize,
+ checksum.getBytesPerChecksum());
streamer = new DataStreamer();
}
streamer.start();
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Thu Nov 3 00:35:23 2011
@@ -18,12 +18,15 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.RandomAccessFile;
+import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
@@ -88,6 +91,18 @@ class BlockMetadataHeader {
}
}
+ /**
+ * Read the header at the beginning of the given block meta file.
+ * The current file position will be altered by this method.
+ * If an error occurs, the file is <em>not</em> closed.
+ */
+ static BlockMetadataHeader readHeader(RandomAccessFile raf) throws IOException {
+ byte[] buf = new byte[getHeaderSize()];
+ raf.seek(0);
+ raf.readFully(buf, 0, buf.length);
+ return readHeader(new DataInputStream(new ByteArrayInputStream(buf)));
+ }
+
// Version is already read.
private static BlockMetadataHeader readHeader(short version, DataInputStream in)
throws IOException {
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Nov 3 00:35:23 2011
@@ -63,7 +63,15 @@ class BlockReceiver implements Closeable
private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
private DataInputStream in = null; // from where data are read
- private DataChecksum checksum; // from where chunks of a block can be read
+ private DataChecksum clientChecksum; // checksum used by client
+ private DataChecksum diskChecksum; // checksum we write to disk
+
+ /**
+ * In the case that the client is writing with a different
+ * checksum polynomial than the block is stored with on disk,
+ * the DataNode needs to recalculate checksums before writing.
+ */
+ private boolean needsChecksumTranslation;
private OutputStream out = null; // to block file at local disk
private FileDescriptor outFd;
private OutputStream cout = null; // output stream for cehcksum file
@@ -177,33 +185,35 @@ class BlockReceiver implements Closeable
" while receiving block " + block + " from " + inAddr);
}
}
- // read checksum meta information
- this.checksum = requestedChecksum;
- this.bytesPerChecksum = checksum.getBytesPerChecksum();
- this.checksumSize = checksum.getChecksumSize();
this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
this.syncBehindWrites = datanode.shouldSyncBehindWrites();
final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
- streams = replicaInfo.createStreams(isCreate,
- this.bytesPerChecksum, this.checksumSize);
- if (streams != null) {
- this.out = streams.dataOut;
- if (out instanceof FileOutputStream) {
- this.outFd = ((FileOutputStream)out).getFD();
- } else {
- LOG.warn("Could not get file descriptor for outputstream of class " +
- out.getClass());
- }
- this.cout = streams.checksumOut;
- this.checksumOut = new DataOutputStream(new BufferedOutputStream(
- streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
- // write data chunk header if creating a new replica
- if (isCreate) {
- BlockMetadataHeader.writeHeader(checksumOut, checksum);
- }
+ streams = replicaInfo.createStreams(isCreate, requestedChecksum);
+ assert streams != null : "null streams!";
+
+ // read checksum meta information
+ this.clientChecksum = requestedChecksum;
+ this.diskChecksum = streams.getChecksum();
+ this.needsChecksumTranslation = !clientChecksum.equals(diskChecksum);
+ this.bytesPerChecksum = diskChecksum.getBytesPerChecksum();
+ this.checksumSize = diskChecksum.getChecksumSize();
+
+ this.out = streams.dataOut;
+ if (out instanceof FileOutputStream) {
+ this.outFd = ((FileOutputStream)out).getFD();
+ } else {
+ LOG.warn("Could not get file descriptor for outputstream of class " +
+ out.getClass());
}
+ this.cout = streams.checksumOut;
+ this.checksumOut = new DataOutputStream(new BufferedOutputStream(
+ streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
+ // write data chunk header if creating a new replica
+ if (isCreate) {
+ BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);
+ }
} catch (ReplicaAlreadyExistsException bae) {
throw bae;
} catch (ReplicaNotFoundException bne) {
@@ -315,9 +325,9 @@ class BlockReceiver implements Closeable
while (len > 0) {
int chunkLen = Math.min(len, bytesPerChecksum);
- checksum.update(dataBuf, dataOff, chunkLen);
+ clientChecksum.update(dataBuf, dataOff, chunkLen);
- if (!checksum.compare(checksumBuf, checksumOff)) {
+ if (!clientChecksum.compare(checksumBuf, checksumOff)) {
if (srcDataNode != null) {
try {
LOG.info("report corrupt block " + block + " from datanode " +
@@ -334,12 +344,32 @@ class BlockReceiver implements Closeable
"while writing " + block + " from " + inAddr);
}
- checksum.reset();
+ clientChecksum.reset();
dataOff += chunkLen;
checksumOff += checksumSize;
len -= chunkLen;
}
}
+
+
+ /**
+ * Translate CRC chunks from the client's checksum implementation
+ * to the disk checksum implementation.
+ *
+ * This does not verify the original checksums, under the assumption
+ * that they have already been validated.
+ */
+ private void translateChunks( byte[] dataBuf, int dataOff, int len,
+ byte[] checksumBuf, int checksumOff )
+ throws IOException {
+ if (len == 0) return;
+
+ int numChunks = (len - 1)/bytesPerChecksum + 1;
+
+ diskChecksum.calculateChunkedSums(
+ ByteBuffer.wrap(dataBuf, dataOff, len),
+ ByteBuffer.wrap(checksumBuf, checksumOff, numChunks * checksumSize));
+ }
/**
* Makes sure buf.position() is zero without modifying buf.remaining().
@@ -583,9 +613,16 @@ class BlockReceiver implements Closeable
* protocol includes acks and only the last datanode needs to verify
* checksum.
*/
- if (mirrorOut == null || isDatanode) {
+ if (mirrorOut == null || isDatanode || needsChecksumTranslation) {
verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+ if (needsChecksumTranslation) {
+ // overwrite the checksums in the packet buffer with the
+ // appropriate polynomial for the disk storage.
+ translateChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+ }
}
+
+ // by this point, the data in the buffer uses the disk checksum
byte[] lastChunkChecksum;
@@ -807,7 +844,7 @@ class BlockReceiver implements Closeable
// find offset of the beginning of partial chunk.
//
int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
- int checksumSize = checksum.getChecksumSize();
+ int checksumSize = diskChecksum.getChecksumSize();
blkoff = blkoff - sizePartialChunk;
LOG.info("computePartialChunkCrc sizePartialChunk " +
sizePartialChunk +
@@ -832,7 +869,8 @@ class BlockReceiver implements Closeable
}
// compute crc of partial chunk from data read in the block file.
- partialCrc = new PureJavaCrc32();
+ partialCrc = DataChecksum.newDataChecksum(
+ diskChecksum.getChecksumType(), diskChecksum.getBytesPerChecksum());
partialCrc.update(buf, 0, sizePartialChunk);
LOG.info("Read in partial CRC chunk from disk for block " + block);
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Thu Nov 3 00:35:23 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
/**
@@ -158,15 +159,23 @@ public interface FSDatasetInterface exte
static class BlockWriteStreams {
OutputStream dataOut;
OutputStream checksumOut;
- BlockWriteStreams(OutputStream dOut, OutputStream cOut) {
+ DataChecksum checksum;
+
+ BlockWriteStreams(OutputStream dOut, OutputStream cOut,
+ DataChecksum checksum) {
dataOut = dOut;
checksumOut = cOut;
+ this.checksum = checksum;
}
void close() throws IOException {
IOUtils.closeStream(dataOut);
IOUtils.closeStream(checksumOut);
}
+
+ DataChecksum getChecksum() {
+ return checksum;
+ }
}
/**
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java Thu Nov 3 00:35:23 2011
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import java.io.DataInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -169,7 +170,7 @@ class ReplicaInPipeline extends ReplicaI
@Override // ReplicaInPipelineInterface
public BlockWriteStreams createStreams(boolean isCreate,
- int bytesPerChunk, int checksumSize) throws IOException {
+ DataChecksum requestedChecksum) throws IOException {
File blockFile = getBlockFile();
File metaFile = getMetaFile();
if (DataNode.LOG.isDebugEnabled()) {
@@ -180,30 +181,64 @@ class ReplicaInPipeline extends ReplicaI
}
long blockDiskSize = 0L;
long crcDiskSize = 0L;
- if (!isCreate) { // check on disk file
- blockDiskSize = bytesOnDisk;
- crcDiskSize = BlockMetadataHeader.getHeaderSize() +
- (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
- if (blockDiskSize>0 &&
- (blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) {
- throw new IOException("Corrupted block: " + this);
+
+ // the checksum that should actually be used -- this
+ // may differ from requestedChecksum for appends.
+ DataChecksum checksum;
+
+ RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+
+ if (!isCreate) {
+ // For append or recovery, we must enforce the existing checksum.
+ // Also, verify that the file has correct lengths, etc.
+ boolean checkedMeta = false;
+ try {
+ BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF);
+ checksum = header.getChecksum();
+
+ if (checksum.getBytesPerChecksum() !=
+ requestedChecksum.getBytesPerChecksum()) {
+ throw new IOException("Client requested checksum " +
+ requestedChecksum + " when appending to an existing block " +
+ "with different chunk size: " + checksum);
+ }
+
+ int bytesPerChunk = checksum.getBytesPerChecksum();
+ int checksumSize = checksum.getChecksumSize();
+
+ blockDiskSize = bytesOnDisk;
+ crcDiskSize = BlockMetadataHeader.getHeaderSize() +
+ (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
+ if (blockDiskSize>0 &&
+ (blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) {
+ throw new IOException("Corrupted block: " + this);
+ }
+ checkedMeta = true;
+ } finally {
+ if (!checkedMeta) {
+ // clean up in case of exceptions.
+ IOUtils.closeStream(metaRAF);
+ }
}
+ } else {
+ // for create, we can use the requested checksum
+ checksum = requestedChecksum;
}
+
FileOutputStream blockOut = null;
FileOutputStream crcOut = null;
try {
blockOut = new FileOutputStream(
new RandomAccessFile( blockFile, "rw" ).getFD() );
- crcOut = new FileOutputStream(
- new RandomAccessFile( metaFile, "rw" ).getFD() );
+ crcOut = new FileOutputStream(metaRAF.getFD() );
if (!isCreate) {
blockOut.getChannel().position(blockDiskSize);
crcOut.getChannel().position(crcDiskSize);
}
- return new BlockWriteStreams(blockOut, crcOut);
+ return new BlockWriteStreams(blockOut, crcOut, checksum);
} catch (IOException e) {
IOUtils.closeStream(blockOut);
- IOUtils.closeStream(crcOut);
+ IOUtils.closeStream(metaRAF);
throw e;
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java Thu Nov 3 00:35:23 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.da
import java.io.IOException;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
+import org.apache.hadoop.util.DataChecksum;
/**
* This defines the interface of a replica in Pipeline that's being written to
@@ -61,11 +62,10 @@ interface ReplicaInPipelineInterface ext
* one for block file and one for CRC file
*
* @param isCreate if it is for creation
- * @param bytePerChunk number of bytes per CRC chunk
- * @param checksumSize number of bytes per checksum
+ * @param requestedChecksum the checksum the writer would prefer to use
* @return output streams for writing
* @throws IOException if any error occurs
*/
public BlockWriteStreams createStreams(boolean isCreate,
- int bytesPerChunk, int checksumSize) throws IOException;
+ DataChecksum requestedChecksum) throws IOException;
}
Added: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendDifferentChecksum.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendDifferentChecksum.java?rev=1196888&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendDifferentChecksum.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendDifferentChecksum.java Thu Nov 3 00:35:23 2011
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Test cases for trying to append to a file with a different
+ * checksum than the file was originally written with.
+ */
+public class TestAppendDifferentChecksum {
+ private static final int SEGMENT_LENGTH = 1500;
+
+ // run the randomized test for 5 seconds
+ private static final long RANDOM_TEST_RUNTIME = 5000;
+ private static MiniDFSCluster cluster;
+ private static FileSystem fs;
+
+
+ @BeforeClass
+ public static void setupCluster() throws IOException {
+ Configuration conf = new HdfsConfiguration();
+ conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
+
+ // disable block scanner, since otherwise this test can trigger
+ // HDFS-2525, which is a different bug than we're trying to unit test
+ // here! When HDFS-2525 is fixed, this can be removed.
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+
+ conf.set("fs.hdfs.impl.disable.cache", "true");
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1)
+ .build();
+ fs = cluster.getFileSystem();
+ }
+
+ @AfterClass
+ public static void teardown() throws IOException {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * This test does not run, since switching chunksize with append
+ * is not implemented. Please see HDFS-2130 for a discussion of the
+ * difficulties in doing so.
+ */
+ @Test
+ @Ignore("this is not implemented! See HDFS-2130")
+ public void testSwitchChunkSize() throws IOException {
+ FileSystem fsWithSmallChunk = createFsWithChecksum("CRC32", 512);
+ FileSystem fsWithBigChunk = createFsWithChecksum("CRC32", 1024);
+ Path p = new Path("/testSwitchChunkSize");
+ appendWithTwoFs(p, fsWithSmallChunk, fsWithBigChunk);
+ AppendTestUtil.check(fsWithSmallChunk, p, SEGMENT_LENGTH * 2);
+ AppendTestUtil.check(fsWithBigChunk, p, SEGMENT_LENGTH * 2);
+ }
+
+ /**
+ * Simple unit test which writes some data with one algorithm,
+ * then appends with another.
+ */
+ @Test
+ public void testSwitchAlgorithms() throws IOException {
+ FileSystem fsWithCrc32 = createFsWithChecksum("CRC32", 512);
+ FileSystem fsWithCrc32C = createFsWithChecksum("CRC32C", 512);
+
+ Path p = new Path("/testSwitchAlgorithms");
+ appendWithTwoFs(p, fsWithCrc32, fsWithCrc32C);
+ // Regardless of which FS is used to read, it should pick up
+ // the on-disk checksum!
+ AppendTestUtil.check(fsWithCrc32C, p, SEGMENT_LENGTH * 2);
+ AppendTestUtil.check(fsWithCrc32, p, SEGMENT_LENGTH * 2);
+ }
+
+ /**
+ * Test which randomly alternates between appending with
+ * CRC32 and with CRC32C, crossing several block boundaries.
+ * Then, checks that all of the data can be read back correct.
+ */
+ @Test(timeout=RANDOM_TEST_RUNTIME*2)
+ public void testAlgoSwitchRandomized() throws IOException {
+ FileSystem fsWithCrc32 = createFsWithChecksum("CRC32", 512);
+ FileSystem fsWithCrc32C = createFsWithChecksum("CRC32C", 512);
+
+ Path p = new Path("/testAlgoSwitchRandomized");
+ long seed = System.currentTimeMillis();
+ System.out.println("seed: " + seed);
+ Random r = new Random(seed);
+
+ // Create empty to start
+ IOUtils.closeStream(fsWithCrc32.create(p));
+
+ long st = System.currentTimeMillis();
+ int len = 0;
+ while (System.currentTimeMillis() - st < RANDOM_TEST_RUNTIME) {
+ int thisLen = r.nextInt(500);
+ FileSystem fs = (r.nextBoolean() ? fsWithCrc32 : fsWithCrc32C);
+ FSDataOutputStream stm = fs.append(p);
+ try {
+ AppendTestUtil.write(stm, len, thisLen);
+ } finally {
+ stm.close();
+ }
+ len += thisLen;
+ }
+
+ AppendTestUtil.check(fsWithCrc32, p, len);
+ AppendTestUtil.check(fsWithCrc32C, p, len);
+ }
+
+ private FileSystem createFsWithChecksum(String type, int bytes)
+ throws IOException {
+ Configuration conf = new Configuration(fs.getConf());
+ conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, type);
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytes);
+ return FileSystem.get(conf);
+ }
+
+
+ private void appendWithTwoFs(Path p, FileSystem fs1, FileSystem fs2)
+ throws IOException {
+ FSDataOutputStream stm = fs1.create(p);
+ try {
+ AppendTestUtil.write(stm, 0, SEGMENT_LENGTH);
+ } finally {
+ stm.close();
+ }
+
+ stm = fs2.append(p);
+ try {
+ AppendTestUtil.write(stm, SEGMENT_LENGTH, SEGMENT_LENGTH);
+ } finally {
+ stm.close();
+ }
+ }
+
+}
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Thu Nov 3 00:35:23 2011
@@ -74,7 +74,7 @@ public class TestDataTransferProtocol ex
"org.apache.hadoop.hdfs.TestDataTransferProtocol");
private static final DataChecksum DEFAULT_CHECKSUM =
- DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512);
+ DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32C, 512);
DatanodeID datanode;
InetSocketAddress dnAddr;
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Thu Nov 3 00:35:23 2011
@@ -204,13 +204,13 @@ public class SimulatedFSDataset impleme
@Override
synchronized public BlockWriteStreams createStreams(boolean isCreate,
- int bytesPerChunk, int checksumSize) throws IOException {
+ DataChecksum requestedChecksum) throws IOException {
if (finalized) {
throw new IOException("Trying to write to a finalized replica "
+ theBlock);
} else {
SimulatedOutputStream crcStream = new SimulatedOutputStream();
- return new BlockWriteStreams(oStream, crcStream);
+ return new BlockWriteStreams(oStream, crcStream, requestedChecksum);
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java Thu Nov 3 00:35:23 2011
@@ -64,7 +64,8 @@ public class TestSimulatedFSDataset exte
// we pass expected len as zero, - fsdataset should use the sizeof actual
// data written
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b);
- BlockWriteStreams out = bInfo.createStreams(true, 512, 4);
+ BlockWriteStreams out = bInfo.createStreams(true,
+ DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512));
try {
OutputStream dataOut = out.dataOut;
assertEquals(0, fsdataset.getLength(b));