You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/11/03 01:35:24 UTC

svn commit: r1196888 - in /hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/test/java/org/apache/hadoop/hdfs/ src/test/java/org/apache...

Author: todd
Date: Thu Nov  3 00:35:23 2011
New Revision: 1196888

URL: http://svn.apache.org/viewvc?rev=1196888&view=rev
Log:
HDFS-2130. Switch default checksum to CRC32C. Contributed by Todd Lipcon.

Added:
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendDifferentChecksum.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Nov  3 00:35:23 2011
@@ -789,6 +789,8 @@ Release 0.23.0 - 2011-11-01 
 
     HDFS-2465. Add HDFS support for fadvise readahead and drop-behind. (todd)
 
+    HDFS-2130. Switch default checksum to CRC32C. (todd)
+
   BUG FIXES
 
     HDFS-2344. Fix the TestOfflineEditsViewer test failure in 0.23 branch.

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Thu Nov  3 00:35:23 2011
@@ -97,6 +97,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenRenewer;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 
 /********************************************************
@@ -139,6 +140,7 @@ public class DFSClient implements java.i
     final int maxBlockAcquireFailures;
     final int confTime;
     final int ioBufferSize;
+    final int checksumType;
     final int bytesPerChecksum;
     final int writePacketSize;
     final int socketTimeout;
@@ -163,6 +165,7 @@ public class DFSClient implements java.i
       ioBufferSize = conf.getInt(
           CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
           CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+      checksumType = getChecksumType(conf);
       bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
           DFS_BYTES_PER_CHECKSUM_DEFAULT);
       socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
@@ -190,6 +193,26 @@ public class DFSClient implements java.i
               DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
       uMask = FsPermission.getUMask(conf);
     }
+
+    private int getChecksumType(Configuration conf) {
+      String checksum = conf.get(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY,
+          DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
+      if ("CRC32".equals(checksum)) {
+        return DataChecksum.CHECKSUM_CRC32;
+      } else if ("CRC32C".equals(checksum)) {
+        return DataChecksum.CHECKSUM_CRC32C;
+      } else if ("NULL".equals(checksum)) {
+        return DataChecksum.CHECKSUM_NULL;
+      } else {
+        LOG.warn("Bad checksum type: " + checksum + ". Using default.");
+        return DataChecksum.CHECKSUM_CRC32C;
+      }
+    }
+
+    private DataChecksum createChecksum() {
+      return DataChecksum.newDataChecksum(
+          checksumType, bytesPerChecksum);
+    }
   }
  
   Conf getConf() {
@@ -755,7 +778,7 @@ public class DFSClient implements java.i
     }
     final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag,
         createParent, replication, blockSize, progress, buffersize,
-        dfsClientConf.bytesPerChecksum);
+        dfsClientConf.createChecksum());
     leaserenewer.put(src, result, this);
     return result;
   }
@@ -799,9 +822,12 @@ public class DFSClient implements java.i
     CreateFlag.validate(flag);
     DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress);
     if (result == null) {
+      DataChecksum checksum = DataChecksum.newDataChecksum(
+          dfsClientConf.checksumType,
+          bytesPerChecksum);
       result = new DFSOutputStream(this, src, absPermission,
           flag, createParent, replication, blockSize, progress, buffersize,
-          bytesPerChecksum);
+          checksum);
     }
     leaserenewer.put(src, result, this);
     return result;
@@ -859,7 +885,7 @@ public class DFSClient implements java.i
                                      UnresolvedPathException.class);
     }
     return new DFSOutputStream(this, src, buffersize, progress,
-        lastBlock, stat, dfsClientConf.bytesPerChecksum);
+        lastBlock, stat, dfsClientConf.createChecksum());
   }
   
   /**

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Thu Nov  3 00:35:23 2011
@@ -38,6 +38,8 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_STREAM_BUFFER_SIZE_DEFAULT = 4096;
   public static final String  DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";
   public static final int     DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
+  public static final String  DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
+  public static final String  DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
   public static final String  DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
   public static final int     DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
   public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Nov  3 00:35:23 2011
@@ -74,7 +74,6 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.PureJavaCrc32;
 
 
 /****************************************************************
@@ -1206,8 +1205,9 @@ class DFSOutputStream extends FSOutputSu
   }
 
   private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progressable progress,
-      int bytesPerChecksum, short replication) throws IOException {
-    super(new PureJavaCrc32(), bytesPerChecksum, 4);
+      DataChecksum checksum, short replication) throws IOException {
+    super(checksum, checksum.getBytesPerChecksum(), checksum.getChecksumSize());
+    int bytesPerChecksum = checksum.getBytesPerChecksum();
     this.dfsClient = dfsClient;
     this.src = src;
     this.blockSize = blockSize;
@@ -1225,8 +1225,7 @@ class DFSOutputStream extends FSOutputSu
                             "multiple of io.bytes.per.checksum");
                             
     }
-    checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
-                                            bytesPerChecksum);
+    this.checksum = checksum;
   }
 
   /**
@@ -1235,11 +1234,12 @@ class DFSOutputStream extends FSOutputSu
    */
   DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag,
       boolean createParent, short replication, long blockSize, Progressable progress,
-      int buffersize, int bytesPerChecksum) 
+      int buffersize, DataChecksum checksum) 
       throws IOException {
-    this(dfsClient, src, blockSize, progress, bytesPerChecksum, replication);
+    this(dfsClient, src, blockSize, progress, checksum, replication);
 
-    computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
+    computePacketChunkSize(dfsClient.getConf().writePacketSize,
+        checksum.getBytesPerChecksum());
 
     try {
       dfsClient.namenode.create(
@@ -1264,8 +1264,8 @@ class DFSOutputStream extends FSOutputSu
    */
   DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress,
       LocatedBlock lastBlock, HdfsFileStatus stat,
-      int bytesPerChecksum) throws IOException {
-    this(dfsClient, src, stat.getBlockSize(), progress, bytesPerChecksum, stat.getReplication());
+      DataChecksum checksum) throws IOException {
+    this(dfsClient, src, stat.getBlockSize(), progress, checksum, stat.getReplication());
     initialFileSize = stat.getLen(); // length of file when opened
 
     //
@@ -1274,9 +1274,10 @@ class DFSOutputStream extends FSOutputSu
     if (lastBlock != null) {
       // indicate that we are appending to an existing block
       bytesCurBlock = lastBlock.getBlockSize();
-      streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
+      streamer = new DataStreamer(lastBlock, stat, checksum.getBytesPerChecksum());
     } else {
-      computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
+      computePacketChunkSize(dfsClient.getConf().writePacketSize,
+          checksum.getBytesPerChecksum());
       streamer = new DataStreamer();
     }
     streamer.start();

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Thu Nov  3 00:35:23 2011
@@ -18,12 +18,15 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 
+import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
 
@@ -88,6 +91,18 @@ class BlockMetadataHeader {
     }
   }
   
+  /**
+   * Read the header at the beginning of the given block meta file.
+   * The current file position will be altered by this method.
+   * If an error occurs, the file is <em>not</em> closed.
+   */
+  static BlockMetadataHeader readHeader(RandomAccessFile raf) throws IOException {
+    byte[] buf = new byte[getHeaderSize()];
+    raf.seek(0);
+    raf.readFully(buf, 0, buf.length);
+    return readHeader(new DataInputStream(new ByteArrayInputStream(buf)));
+  }
+  
   // Version is already read.
   private static BlockMetadataHeader readHeader(short version, DataInputStream in) 
                                    throws IOException {

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Nov  3 00:35:23 2011
@@ -63,7 +63,15 @@ class BlockReceiver implements Closeable
   private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
   
   private DataInputStream in = null; // from where data are read
-  private DataChecksum checksum; // from where chunks of a block can be read
+  private DataChecksum clientChecksum; // checksum used by client
+  private DataChecksum diskChecksum; // checksum we write to disk
+  
+  /**
+   * In the case that the client is writing with a different
+   * checksum polynomial than the block is stored with on disk,
+   * the DataNode needs to recalculate checksums before writing.
+   */
+  private boolean needsChecksumTranslation;
   private OutputStream out = null; // to block file at local disk
   private FileDescriptor outFd;
   private OutputStream cout = null; // output stream for cehcksum file
@@ -177,33 +185,35 @@ class BlockReceiver implements Closeable
               " while receiving block " + block + " from " + inAddr);
         }
       }
-      // read checksum meta information
-      this.checksum = requestedChecksum;
-      this.bytesPerChecksum = checksum.getBytesPerChecksum();
-      this.checksumSize = checksum.getChecksumSize();
       this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
       this.syncBehindWrites = datanode.shouldSyncBehindWrites();
       
       final boolean isCreate = isDatanode || isTransfer 
           || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
-      streams = replicaInfo.createStreams(isCreate,
-          this.bytesPerChecksum, this.checksumSize);
-      if (streams != null) {
-        this.out = streams.dataOut;
-        if (out instanceof FileOutputStream) {
-          this.outFd = ((FileOutputStream)out).getFD();
-        } else {
-          LOG.warn("Could not get file descriptor for outputstream of class " +
-              out.getClass());
-        }
-        this.cout = streams.checksumOut;
-        this.checksumOut = new DataOutputStream(new BufferedOutputStream(
-            streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
-        // write data chunk header if creating a new replica
-        if (isCreate) {
-          BlockMetadataHeader.writeHeader(checksumOut, checksum);
-        } 
+      streams = replicaInfo.createStreams(isCreate, requestedChecksum);
+      assert streams != null : "null streams!";
+
+      // read checksum meta information
+      this.clientChecksum = requestedChecksum;
+      this.diskChecksum = streams.getChecksum();
+      this.needsChecksumTranslation = !clientChecksum.equals(diskChecksum);
+      this.bytesPerChecksum = diskChecksum.getBytesPerChecksum();
+      this.checksumSize = diskChecksum.getChecksumSize();
+
+      this.out = streams.dataOut;
+      if (out instanceof FileOutputStream) {
+        this.outFd = ((FileOutputStream)out).getFD();
+      } else {
+        LOG.warn("Could not get file descriptor for outputstream of class " +
+            out.getClass());
       }
+      this.cout = streams.checksumOut;
+      this.checksumOut = new DataOutputStream(new BufferedOutputStream(
+          streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
+      // write data chunk header if creating a new replica
+      if (isCreate) {
+        BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);
+      } 
     } catch (ReplicaAlreadyExistsException bae) {
       throw bae;
     } catch (ReplicaNotFoundException bne) {
@@ -315,9 +325,9 @@ class BlockReceiver implements Closeable
     while (len > 0) {
       int chunkLen = Math.min(len, bytesPerChecksum);
       
-      checksum.update(dataBuf, dataOff, chunkLen);
+      clientChecksum.update(dataBuf, dataOff, chunkLen);
 
-      if (!checksum.compare(checksumBuf, checksumOff)) {
+      if (!clientChecksum.compare(checksumBuf, checksumOff)) {
         if (srcDataNode != null) {
           try {
             LOG.info("report corrupt block " + block + " from datanode " +
@@ -334,12 +344,32 @@ class BlockReceiver implements Closeable
                               "while writing " + block + " from " + inAddr);
       }
 
-      checksum.reset();
+      clientChecksum.reset();
       dataOff += chunkLen;
       checksumOff += checksumSize;
       len -= chunkLen;
     }
   }
+  
+    
+  /**
+   * Translate CRC chunks from the client's checksum implementation
+   * to the disk checksum implementation.
+   * 
+   * This does not verify the original checksums, under the assumption
+   * that they have already been validated.
+   */
+  private void translateChunks( byte[] dataBuf, int dataOff, int len, 
+                             byte[] checksumBuf, int checksumOff ) 
+                             throws IOException {
+    if (len == 0) return;
+    
+    int numChunks = (len - 1)/bytesPerChecksum + 1;
+    
+    diskChecksum.calculateChunkedSums(
+        ByteBuffer.wrap(dataBuf, dataOff, len),
+        ByteBuffer.wrap(checksumBuf, checksumOff, numChunks * checksumSize));
+  }
 
   /**
    * Makes sure buf.position() is zero without modifying buf.remaining().
@@ -583,9 +613,16 @@ class BlockReceiver implements Closeable
        * protocol includes acks and only the last datanode needs to verify 
        * checksum.
        */
-      if (mirrorOut == null || isDatanode) {
+      if (mirrorOut == null || isDatanode || needsChecksumTranslation) {
         verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+        if (needsChecksumTranslation) {
+          // overwrite the checksums in the packet buffer with the
+          // appropriate polynomial for the disk storage.
+          translateChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+        }
       }
+      
+      // by this point, the data in the buffer uses the disk checksum
 
       byte[] lastChunkChecksum;
       
@@ -807,7 +844,7 @@ class BlockReceiver implements Closeable
     // find offset of the beginning of partial chunk.
     //
     int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
-    int checksumSize = checksum.getChecksumSize();
+    int checksumSize = diskChecksum.getChecksumSize();
     blkoff = blkoff - sizePartialChunk;
     LOG.info("computePartialChunkCrc sizePartialChunk " + 
               sizePartialChunk +
@@ -832,7 +869,8 @@ class BlockReceiver implements Closeable
     }
 
     // compute crc of partial chunk from data read in the block file.
-    partialCrc = new PureJavaCrc32();
+    partialCrc = DataChecksum.newDataChecksum(
+        diskChecksum.getChecksumType(), diskChecksum.getBytesPerChecksum());
     partialCrc.update(buf, 0, sizePartialChunk);
     LOG.info("Read in partial CRC chunk from disk for block " + block);
 

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Thu Nov  3 00:35:23 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 /**
@@ -158,15 +159,23 @@ public interface FSDatasetInterface exte
      static class BlockWriteStreams {
       OutputStream dataOut;
       OutputStream checksumOut;
-      BlockWriteStreams(OutputStream dOut, OutputStream cOut) {
+      DataChecksum checksum;
+      
+      BlockWriteStreams(OutputStream dOut, OutputStream cOut,
+          DataChecksum checksum) {
         dataOut = dOut;
         checksumOut = cOut;
+        this.checksum = checksum;
       }
       
       void close() throws IOException {
         IOUtils.closeStream(dataOut);
         IOUtils.closeStream(checksumOut);
       }
+      
+      DataChecksum getChecksum() {
+        return checksum;
+      }
     }
 
   /**

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java Thu Nov  3 00:35:23 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -169,7 +170,7 @@ class ReplicaInPipeline extends ReplicaI
   
   @Override // ReplicaInPipelineInterface
   public BlockWriteStreams createStreams(boolean isCreate, 
-      int bytesPerChunk, int checksumSize) throws IOException {
+      DataChecksum requestedChecksum) throws IOException {
     File blockFile = getBlockFile();
     File metaFile = getMetaFile();
     if (DataNode.LOG.isDebugEnabled()) {
@@ -180,30 +181,64 @@ class ReplicaInPipeline extends ReplicaI
     }
     long blockDiskSize = 0L;
     long crcDiskSize = 0L;
-    if (!isCreate) { // check on disk file
-      blockDiskSize = bytesOnDisk;
-      crcDiskSize = BlockMetadataHeader.getHeaderSize() +
-      (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
-      if (blockDiskSize>0 && 
-          (blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) {
-        throw new IOException("Corrupted block: " + this);
+    
+    // the checksum that should actually be used -- this
+    // may differ from requestedChecksum for appends.
+    DataChecksum checksum;
+    
+    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+    
+    if (!isCreate) {
+      // For append or recovery, we must enforce the existing checksum.
+      // Also, verify that the file has correct lengths, etc.
+      boolean checkedMeta = false;
+      try {
+        BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF);
+        checksum = header.getChecksum();
+        
+        if (checksum.getBytesPerChecksum() !=
+            requestedChecksum.getBytesPerChecksum()) {
+          throw new IOException("Client requested checksum " +
+              requestedChecksum + " when appending to an existing block " +
+              "with different chunk size: " + checksum);
+        }
+        
+        int bytesPerChunk = checksum.getBytesPerChecksum();
+        int checksumSize = checksum.getChecksumSize();
+        
+        blockDiskSize = bytesOnDisk;
+        crcDiskSize = BlockMetadataHeader.getHeaderSize() +
+          (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
+        if (blockDiskSize>0 && 
+            (blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) {
+          throw new IOException("Corrupted block: " + this);
+        }
+        checkedMeta = true;
+      } finally {
+        if (!checkedMeta) {
+          // clean up in case of exceptions.
+          IOUtils.closeStream(metaRAF);
+        }
       }
+    } else {
+      // for create, we can use the requested checksum
+      checksum = requestedChecksum;
     }
+    
     FileOutputStream blockOut = null;
     FileOutputStream crcOut = null;
     try {
       blockOut = new FileOutputStream(
           new RandomAccessFile( blockFile, "rw" ).getFD() );
-      crcOut = new FileOutputStream(
-          new RandomAccessFile( metaFile, "rw" ).getFD() );
+      crcOut = new FileOutputStream(metaRAF.getFD() );
       if (!isCreate) {
         blockOut.getChannel().position(blockDiskSize);
         crcOut.getChannel().position(crcDiskSize);
       }
-      return new BlockWriteStreams(blockOut, crcOut);
+      return new BlockWriteStreams(blockOut, crcOut, checksum);
     } catch (IOException e) {
       IOUtils.closeStream(blockOut);
-      IOUtils.closeStream(crcOut);
+      IOUtils.closeStream(metaRAF);
       throw e;
     }
   }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java Thu Nov  3 00:35:23 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.da
 import java.io.IOException;
 
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
+import org.apache.hadoop.util.DataChecksum;
 
 /** 
  * This defines the interface of a replica in Pipeline that's being written to
@@ -61,11 +62,10 @@ interface ReplicaInPipelineInterface ext
    * one for block file and one for CRC file
    * 
    * @param isCreate if it is for creation
-   * @param bytePerChunk number of bytes per CRC chunk
-   * @param checksumSize number of bytes per checksum
+   * @param requestedChecksum the checksum the writer would prefer to use
    * @return output streams for writing
    * @throws IOException if any error occurs
    */
   public BlockWriteStreams createStreams(boolean isCreate,
-      int bytesPerChunk, int checksumSize) throws IOException;
+      DataChecksum requestedChecksum) throws IOException;
 }

Added: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendDifferentChecksum.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendDifferentChecksum.java?rev=1196888&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendDifferentChecksum.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendDifferentChecksum.java Thu Nov  3 00:35:23 2011
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Test cases for trying to append to a file with a different
+ * checksum than the file was originally written with.
+ */
+public class TestAppendDifferentChecksum {
+  private static final int SEGMENT_LENGTH = 1500;
+
+  // run the randomized test for 5 seconds
+  private static final long RANDOM_TEST_RUNTIME = 5000;
+  private static MiniDFSCluster cluster;
+  private static FileSystem fs; 
+  
+
+  @BeforeClass
+  public static void setupCluster() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
+    
+    // disable block scanner, since otherwise this test can trigger
+    // HDFS-2525, which is a different bug than we're trying to unit test
+    // here! When HDFS-2525 is fixed, this can be removed.
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+
+    conf.set("fs.hdfs.impl.disable.cache", "true");
+    cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(1)
+      .build();
+    fs = cluster.getFileSystem();
+  }
+  
+  @AfterClass
+  public static void teardown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * This test does not run, since switching chunksize with append
+   * is not implemented. Please see HDFS-2130 for a discussion of the
+   * difficulties in doing so.
+   */
+  @Test
+  @Ignore("this is not implemented! See HDFS-2130")
+  public void testSwitchChunkSize() throws IOException {
+    FileSystem fsWithSmallChunk = createFsWithChecksum("CRC32", 512);
+    FileSystem fsWithBigChunk = createFsWithChecksum("CRC32", 1024);
+    Path p = new Path("/testSwitchChunkSize");
+    appendWithTwoFs(p, fsWithSmallChunk, fsWithBigChunk);
+    AppendTestUtil.check(fsWithSmallChunk, p, SEGMENT_LENGTH * 2);
+    AppendTestUtil.check(fsWithBigChunk, p, SEGMENT_LENGTH * 2);
+  }
+  
+  /**
+   * Simple unit test which writes some data with one algorithm,
+   * then appends with another.
+   */
+  @Test
+  public void testSwitchAlgorithms() throws IOException {
+    FileSystem fsWithCrc32 = createFsWithChecksum("CRC32", 512);
+    FileSystem fsWithCrc32C = createFsWithChecksum("CRC32C", 512);
+    
+    Path p = new Path("/testSwitchAlgorithms");
+    appendWithTwoFs(p, fsWithCrc32, fsWithCrc32C);
+    // Regardless of which FS is used to read, it should pick up
+    // the on-disk checksum!
+    AppendTestUtil.check(fsWithCrc32C, p, SEGMENT_LENGTH * 2);
+    AppendTestUtil.check(fsWithCrc32, p, SEGMENT_LENGTH * 2);
+  }
+  
+  /**
+   * Test which randomly alternates between appending with
+   * CRC32 and with CRC32C, crossing several block boundaries.
+   * Then, checks that all of the data can be read back correct.
+   */
+  @Test(timeout=RANDOM_TEST_RUNTIME*2)
+  public void testAlgoSwitchRandomized() throws IOException {
+    FileSystem fsWithCrc32 = createFsWithChecksum("CRC32", 512);
+    FileSystem fsWithCrc32C = createFsWithChecksum("CRC32C", 512);
+
+    Path p = new Path("/testAlgoSwitchRandomized");
+    long seed = System.currentTimeMillis();
+    System.out.println("seed: " + seed);
+    Random r = new Random(seed);
+    
+    // Create empty to start
+    IOUtils.closeStream(fsWithCrc32.create(p));
+    
+    long st = System.currentTimeMillis();
+    int len = 0;
+    while (System.currentTimeMillis() - st < RANDOM_TEST_RUNTIME) {
+      int thisLen = r.nextInt(500);
+      FileSystem fs = (r.nextBoolean() ? fsWithCrc32 : fsWithCrc32C);
+      FSDataOutputStream stm = fs.append(p);
+      try {
+        AppendTestUtil.write(stm, len, thisLen);
+      } finally {
+        stm.close();
+      }
+      len += thisLen;
+    }
+    
+    AppendTestUtil.check(fsWithCrc32, p, len);
+    AppendTestUtil.check(fsWithCrc32C, p, len);
+  }
+  
+  private FileSystem createFsWithChecksum(String type, int bytes)
+      throws IOException {
+    Configuration conf = new Configuration(fs.getConf());
+    conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, type);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytes);
+    return FileSystem.get(conf);
+  }
+
+
+  private void appendWithTwoFs(Path p, FileSystem fs1, FileSystem fs2)
+      throws IOException {
+    FSDataOutputStream stm = fs1.create(p);
+    try {
+      AppendTestUtil.write(stm, 0, SEGMENT_LENGTH);
+    } finally {
+      stm.close();
+    }
+    
+    stm = fs2.append(p);
+    try {
+      AppendTestUtil.write(stm, SEGMENT_LENGTH, SEGMENT_LENGTH);
+    } finally {
+      stm.close();
+    }    
+  }
+
+}

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Thu Nov  3 00:35:23 2011
@@ -74,7 +74,7 @@ public class TestDataTransferProtocol ex
                     "org.apache.hadoop.hdfs.TestDataTransferProtocol");
 
   private static final DataChecksum DEFAULT_CHECKSUM =
-    DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512);
+    DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32C, 512);
   
   DatanodeID datanode;
   InetSocketAddress dnAddr;

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Thu Nov  3 00:35:23 2011
@@ -204,13 +204,13 @@ public class SimulatedFSDataset  impleme
 
     @Override
     synchronized public BlockWriteStreams createStreams(boolean isCreate, 
-        int bytesPerChunk, int checksumSize) throws IOException {
+        DataChecksum requestedChecksum) throws IOException {
       if (finalized) {
         throw new IOException("Trying to write to a finalized replica "
             + theBlock);
       } else {
         SimulatedOutputStream crcStream = new SimulatedOutputStream();
-        return new BlockWriteStreams(oStream, crcStream);
+        return new BlockWriteStreams(oStream, crcStream, requestedChecksum);
       }
     }
 

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java?rev=1196888&r1=1196887&r2=1196888&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java Thu Nov  3 00:35:23 2011
@@ -64,7 +64,8 @@ public class TestSimulatedFSDataset exte
       // we pass expected len as zero, - fsdataset should use the sizeof actual
       // data written
       ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b);
-      BlockWriteStreams out = bInfo.createStreams(true, 512, 4);
+      BlockWriteStreams out = bInfo.createStreams(true,
+          DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512));
       try {
         OutputStream dataOut  = out.dataOut;
         assertEquals(0, fsdataset.getLength(b));