You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/08/21 05:29:47 UTC

svn commit: r1375380 - in /hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src: main/java/org/apache/hadoop/fs/ main/java/org/apache/hadoop/hdfs/ main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/ main/java/org/apache/ha...

Author: szetszwo
Date: Tue Aug 21 03:29:45 2012
New Revision: 1375380

URL: http://svn.apache.org/viewvc?rev=1375380&view=rev
Log:
HADOOP-8240. Add a new API to allow users to specify a checksum type on FileSystem.create(..).  Contributed by Kihwal Lee

Modified:
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java?rev=1375380&r1=1375379&r2=1375380&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java Tue Aug 21 03:29:45 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.hdfs.CorruptFileBlockIterator;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -93,10 +94,10 @@ public class Hdfs extends AbstractFileSy
   public FSDataOutputStream createInternal(Path f,
       EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
       int bufferSize, short replication, long blockSize, Progressable progress,
-      int bytesPerChecksum, boolean createParent) throws IOException {
+      ChecksumOpt checksumOpt, boolean createParent) throws IOException {
     return new FSDataOutputStream(dfs.primitiveCreate(getUriPath(f),
         absolutePermission, createFlag, createParent, replication, blockSize,
-        progress, bufferSize, bytesPerChecksum), getStatistics());
+        progress, bufferSize, checksumOpt), getStatistics());
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1375380&r1=1375379&r2=1375380&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Aug 21 03:29:45 2012
@@ -79,6 +79,7 @@ import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
@@ -171,8 +172,7 @@ public class DFSClient implements java.i
     final int maxBlockAcquireFailures;
     final int confTime;
     final int ioBufferSize;
-    final DataChecksum.Type checksumType;
-    final int bytesPerChecksum;
+    final ChecksumOpt defaultChecksumOpt;
     final int writePacketSize;
     final int socketTimeout;
     final int socketCacheCapacity;
@@ -197,9 +197,7 @@ public class DFSClient implements java.i
       ioBufferSize = conf.getInt(
           CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
           CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
-      checksumType = getChecksumType(conf);
-      bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
-          DFS_BYTES_PER_CHECKSUM_DEFAULT);
+      defaultChecksumOpt = getChecksumOptFromConf(conf);
       socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
           HdfsServerConstants.READ_TIMEOUT);
       /** dfs.write.packet.size is an internal config variable */
@@ -243,9 +241,32 @@ public class DFSClient implements java.i
       }
     }
 
-    private DataChecksum createChecksum() {
-      return DataChecksum.newDataChecksum(
-          checksumType, bytesPerChecksum);
+    // Construct a checksum option from conf
+    private ChecksumOpt getChecksumOptFromConf(Configuration conf) {
+      DataChecksum.Type type = getChecksumType(conf);
+      int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
+          DFS_BYTES_PER_CHECKSUM_DEFAULT);
+      return new ChecksumOpt(type, bytesPerChecksum);
+    }
+
+    // create a DataChecksum with the default option.
+    private DataChecksum createChecksum() throws IOException {
+      return createChecksum(null);
+    }
+
+    private DataChecksum createChecksum(ChecksumOpt userOpt) 
+        throws IOException {
+      // Fill in any missing field with the default.
+      ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
+          defaultChecksumOpt, userOpt);
+      DataChecksum dataChecksum = DataChecksum.newDataChecksum(
+          myOpt.getChecksumType(),
+          myOpt.getBytesPerChecksum());
+      if (dataChecksum == null) {
+        throw new IOException("Invalid checksum type specified: "
+            + myOpt.getChecksumType().name());
+      }
+      return dataChecksum;
     }
   }
  
@@ -926,12 +947,13 @@ public class DFSClient implements java.i
     return create(src, FsPermission.getDefault(),
         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
             : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
-        buffersize);
+        buffersize, null);
   }
 
   /**
    * Call {@link #create(String, FsPermission, EnumSet, boolean, short, 
-   * long, Progressable, int)} with <code>createParent</code> set to true.
+   * long, Progressable, int, ChecksumOpt)} with <code>createParent</code>
+   *  set to true.
    */
   public OutputStream create(String src, 
                              FsPermission permission,
@@ -939,10 +961,11 @@ public class DFSClient implements java.i
                              short replication,
                              long blockSize,
                              Progressable progress,
-                             int buffersize)
+                             int buffersize,
+                             ChecksumOpt checksumOpt)
       throws IOException {
     return create(src, permission, flag, true,
-        replication, blockSize, progress, buffersize);
+        replication, blockSize, progress, buffersize, checksumOpt);
   }
 
   /**
@@ -960,6 +983,7 @@ public class DFSClient implements java.i
    * @param blockSize maximum block size
    * @param progress interface for reporting client progress
    * @param buffersize underlying buffer size 
+   * @param checksumOpts checksum options
    * 
    * @return output stream
    * 
@@ -973,8 +997,8 @@ public class DFSClient implements java.i
                              short replication,
                              long blockSize,
                              Progressable progress,
-                             int buffersize)
-    throws IOException {
+                             int buffersize,
+                             ChecksumOpt checksumOpt) throws IOException {
     checkOpen();
     if (permission == null) {
       permission = FsPermission.getDefault();
@@ -985,7 +1009,7 @@ public class DFSClient implements java.i
     }
     final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag,
         createParent, replication, blockSize, progress, buffersize,
-        dfsClientConf.createChecksum());
+        dfsClientConf.createChecksum(checksumOpt));
     beginFileLease(src, result);
     return result;
   }
@@ -1023,15 +1047,13 @@ public class DFSClient implements java.i
                              long blockSize,
                              Progressable progress,
                              int buffersize,
-                             int bytesPerChecksum)
+                             ChecksumOpt checksumOpt)
       throws IOException, UnresolvedLinkException {
     checkOpen();
     CreateFlag.validate(flag);
     DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress);
     if (result == null) {
-      DataChecksum checksum = DataChecksum.newDataChecksum(
-          dfsClientConf.checksumType,
-          bytesPerChecksum);
+      DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt);
       result = new DFSOutputStream(this, src, absPermission,
           flag, createParent, replication, blockSize, progress, buffersize,
           checksum);

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1375380&r1=1375379&r2=1375380&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Tue Aug 21 03:29:45 2012
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -212,11 +213,19 @@ public class DistributedFileSystem exten
   public FSDataOutputStream create(Path f, FsPermission permission,
     boolean overwrite, int bufferSize, short replication, long blockSize,
     Progressable progress) throws IOException {
-    statistics.incrementWriteOps(1);
-    return new FSDataOutputStream(dfs.create(getPathName(f), permission,
+    return this.create(f, permission,
         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
-            : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
-        bufferSize), statistics);
+            : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
+        blockSize, progress, null);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission,
+    EnumSet<CreateFlag> cflags, int bufferSize, short replication, long blockSize,
+    Progressable progress, ChecksumOpt checksumOpt) throws IOException {
+    statistics.incrementWriteOps(1);
+    return new FSDataOutputStream(dfs.create(getPathName(f), permission, cflags,
+        replication, blockSize, progress, bufferSize, checksumOpt), statistics);
   }
   
   @SuppressWarnings("deprecation")
@@ -224,11 +233,11 @@ public class DistributedFileSystem exten
   protected FSDataOutputStream primitiveCreate(Path f,
     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
     short replication, long blockSize, Progressable progress,
-    int bytesPerChecksum) throws IOException {
+    ChecksumOpt checksumOpt) throws IOException {
     statistics.incrementReadOps(1);
     return new FSDataOutputStream(dfs.primitiveCreate(getPathName(f),
         absolutePermission, flag, true, replication, blockSize,
-        progress, bufferSize, bytesPerChecksum),statistics);
+        progress, bufferSize, checksumOpt), statistics);
    } 
 
   /**
@@ -242,7 +251,8 @@ public class DistributedFileSystem exten
       flag.add(CreateFlag.CREATE);
     }
     return new FSDataOutputStream(dfs.create(getPathName(f), permission, flag,
-        false, replication, blockSize, progress, bufferSize), statistics);
+        false, replication, blockSize, progress,
+         bufferSize, null), statistics);
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1375380&r1=1375379&r2=1375380&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Tue Aug 21 03:29:45 2012
@@ -194,7 +194,7 @@ public class DatanodeWebHdfsMethods {
             fullpath, permission.getFsPermission(), 
             overwrite.getValue() ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
                 : EnumSet.of(CreateFlag.CREATE),
-            replication.getValue(conf), blockSize.getValue(conf), null, b), null);
+            replication.getValue(conf), blockSize.getValue(conf), null, b, null), null);
         IOUtils.copyBytes(in, out, b);
         out.close();
         out = null;

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1375380&r1=1375379&r2=1375380&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Aug 21 03:29:45 2012
@@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
@@ -169,6 +171,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
@@ -485,13 +488,24 @@ public class FSNamesystem implements Nam
                                               DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT);
     this.defaultPermission = PermissionStatus.createImmutable(
         fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission));
+
+    // Get the checksum type from config
+    String checksumTypeStr = conf.get(DFS_CHECKSUM_TYPE_KEY, DFS_CHECKSUM_TYPE_DEFAULT);
+    DataChecksum.Type checksumType;
+    try {
+       checksumType = DataChecksum.Type.valueOf(checksumTypeStr);
+    } catch (IllegalArgumentException iae) {
+       throw new IOException("Invalid checksum type in "
+          + DFS_CHECKSUM_TYPE_KEY + ": " + checksumTypeStr);
+    }
     
     this.serverDefaults = new FsServerDefaults(
         conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
         conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
         conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
         (short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
-        conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT));
+        conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
+        checksumType);
     
     this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, 
                                      DFS_NAMENODE_MAX_OBJECTS_DEFAULT);

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1375380&r1=1375379&r2=1375380&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Tue Aug 21 03:29:45 2012
@@ -27,18 +27,22 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.util.EnumSet;
 import java.util.Random;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Level;
 import org.junit.Test;
@@ -568,4 +572,54 @@ public class TestDistributedFileSystem {
     testDFSClient();
     testFileChecksum();
   }
+
+  @Test
+  public void testCreateWithCustomChecksum() throws Exception {
+    Configuration conf = getTestConfiguration();
+    final long grace = 1000L;
+    MiniDFSCluster cluster = null;
+    Path testBasePath = new Path("/test/csum");
+    // create args 
+    Path path1 = new Path(testBasePath, "file_wtih_crc1");
+    Path path2 = new Path(testBasePath, "file_with_crc2");
+    ChecksumOpt opt1 = new ChecksumOpt(DataChecksum.Type.CRC32C, 512);
+    ChecksumOpt opt2 = new ChecksumOpt(DataChecksum.Type.CRC32, 512);
+
+    // common args
+    FsPermission perm = FsPermission.getDefault().applyUMask(
+        FsPermission.getUMask(conf));
+    EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.OVERWRITE,
+        CreateFlag.CREATE);
+    short repl = 1;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      FileSystem dfs = cluster.getFileSystem();
+
+      dfs.mkdirs(testBasePath);
+
+      // create two files with different checksum types
+      FSDataOutputStream out1 = dfs.create(path1, perm, flags, 4096, repl,
+          131072L, null, opt1);
+      FSDataOutputStream out2 = dfs.create(path2, perm, flags, 4096, repl,
+          131072L, null, opt2);
+
+      for (int i = 0; i < 1024; i++) {
+        out1.write(i);
+        out2.write(i);
+      }
+      out1.close();
+      out2.close();
+
+      // the two checksums must be different.
+      FileChecksum sum1 = dfs.getFileChecksum(path1);
+      FileChecksum sum2 = dfs.getFileChecksum(path2);
+      assertFalse(sum1.equals(sum2));
+    } finally {
+      if (cluster != null) {
+        cluster.getFileSystem().delete(testBasePath, true);
+        cluster.shutdown();
+      }
+    }
+  }
 }