You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/08/21 05:29:47 UTC
svn commit: r1375380 - in
/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src:
main/java/org/apache/hadoop/fs/ main/java/org/apache/hadoop/hdfs/
main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/
main/java/org/apache/ha...
Author: szetszwo
Date: Tue Aug 21 03:29:45 2012
New Revision: 1375380
URL: http://svn.apache.org/viewvc?rev=1375380&view=rev
Log:
HADOOP-8240. Add a new API to allow users to specify a checksum type on FileSystem.create(..). Contributed by Kihwal Lee
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java?rev=1375380&r1=1375379&r2=1375380&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java Tue Aug 21 03:29:45 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.hdfs.CorruptFileBlockIterator;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSUtil;
@@ -93,10 +94,10 @@ public class Hdfs extends AbstractFileSy
public FSDataOutputStream createInternal(Path f,
EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
int bufferSize, short replication, long blockSize, Progressable progress,
- int bytesPerChecksum, boolean createParent) throws IOException {
+ ChecksumOpt checksumOpt, boolean createParent) throws IOException {
return new FSDataOutputStream(dfs.primitiveCreate(getUriPath(f),
absolutePermission, createFlag, createParent, replication, blockSize,
- progress, bufferSize, bytesPerChecksum), getStatistics());
+ progress, bufferSize, checksumOpt), getStatistics());
}
@Override
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1375380&r1=1375379&r2=1375380&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Aug 21 03:29:45 2012
@@ -79,6 +79,7 @@ import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
@@ -171,8 +172,7 @@ public class DFSClient implements java.i
final int maxBlockAcquireFailures;
final int confTime;
final int ioBufferSize;
- final DataChecksum.Type checksumType;
- final int bytesPerChecksum;
+ final ChecksumOpt defaultChecksumOpt;
final int writePacketSize;
final int socketTimeout;
final int socketCacheCapacity;
@@ -197,9 +197,7 @@ public class DFSClient implements java.i
ioBufferSize = conf.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
- checksumType = getChecksumType(conf);
- bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
- DFS_BYTES_PER_CHECKSUM_DEFAULT);
+ defaultChecksumOpt = getChecksumOptFromConf(conf);
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT);
/** dfs.write.packet.size is an internal config variable */
@@ -243,9 +241,32 @@ public class DFSClient implements java.i
}
}
- private DataChecksum createChecksum() {
- return DataChecksum.newDataChecksum(
- checksumType, bytesPerChecksum);
+ // Construct a checksum option from conf
+ private ChecksumOpt getChecksumOptFromConf(Configuration conf) {
+ DataChecksum.Type type = getChecksumType(conf);
+ int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
+ DFS_BYTES_PER_CHECKSUM_DEFAULT);
+ return new ChecksumOpt(type, bytesPerChecksum);
+ }
+
+ // create a DataChecksum with the default option.
+ private DataChecksum createChecksum() throws IOException {
+ return createChecksum(null);
+ }
+
+ private DataChecksum createChecksum(ChecksumOpt userOpt)
+ throws IOException {
+ // Fill in any missing field with the default.
+ ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
+ defaultChecksumOpt, userOpt);
+ DataChecksum dataChecksum = DataChecksum.newDataChecksum(
+ myOpt.getChecksumType(),
+ myOpt.getBytesPerChecksum());
+ if (dataChecksum == null) {
+ throw new IOException("Invalid checksum type specified: "
+ + myOpt.getChecksumType().name());
+ }
+ return dataChecksum;
}
}
@@ -926,12 +947,13 @@ public class DFSClient implements java.i
return create(src, FsPermission.getDefault(),
overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
: EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
- buffersize);
+ buffersize, null);
}
/**
* Call {@link #create(String, FsPermission, EnumSet, boolean, short,
- * long, Progressable, int)} with <code>createParent</code> set to true.
+ * long, Progressable, int, ChecksumOpt)} with <code>createParent</code>
+ * set to true.
*/
public OutputStream create(String src,
FsPermission permission,
@@ -939,10 +961,11 @@ public class DFSClient implements java.i
short replication,
long blockSize,
Progressable progress,
- int buffersize)
+ int buffersize,
+ ChecksumOpt checksumOpt)
throws IOException {
return create(src, permission, flag, true,
- replication, blockSize, progress, buffersize);
+ replication, blockSize, progress, buffersize, checksumOpt);
}
/**
@@ -960,6 +983,7 @@ public class DFSClient implements java.i
* @param blockSize maximum block size
* @param progress interface for reporting client progress
* @param buffersize underlying buffer size
+ * @param checksumOpts checksum options
*
* @return output stream
*
@@ -973,8 +997,8 @@ public class DFSClient implements java.i
short replication,
long blockSize,
Progressable progress,
- int buffersize)
- throws IOException {
+ int buffersize,
+ ChecksumOpt checksumOpt) throws IOException {
checkOpen();
if (permission == null) {
permission = FsPermission.getDefault();
@@ -985,7 +1009,7 @@ public class DFSClient implements java.i
}
final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag,
createParent, replication, blockSize, progress, buffersize,
- dfsClientConf.createChecksum());
+ dfsClientConf.createChecksum(checksumOpt));
beginFileLease(src, result);
return result;
}
@@ -1023,15 +1047,13 @@ public class DFSClient implements java.i
long blockSize,
Progressable progress,
int buffersize,
- int bytesPerChecksum)
+ ChecksumOpt checksumOpt)
throws IOException, UnresolvedLinkException {
checkOpen();
CreateFlag.validate(flag);
DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress);
if (result == null) {
- DataChecksum checksum = DataChecksum.newDataChecksum(
- dfsClientConf.checksumType,
- bytesPerChecksum);
+ DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt);
result = new DFSOutputStream(this, src, absPermission,
flag, createParent, replication, blockSize, progress, buffersize,
checksum);
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1375380&r1=1375379&r2=1375380&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Tue Aug 21 03:29:45 2012
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
@@ -212,11 +213,19 @@ public class DistributedFileSystem exten
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
- statistics.incrementWriteOps(1);
- return new FSDataOutputStream(dfs.create(getPathName(f), permission,
+ return this.create(f, permission,
overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
- : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
- bufferSize), statistics);
+ : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
+ blockSize, progress, null);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission,
+ EnumSet<CreateFlag> cflags, int bufferSize, short replication, long blockSize,
+ Progressable progress, ChecksumOpt checksumOpt) throws IOException {
+ statistics.incrementWriteOps(1);
+ return new FSDataOutputStream(dfs.create(getPathName(f), permission, cflags,
+ replication, blockSize, progress, bufferSize, checksumOpt), statistics);
}
@SuppressWarnings("deprecation")
@@ -224,11 +233,11 @@ public class DistributedFileSystem exten
protected FSDataOutputStream primitiveCreate(Path f,
FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
short replication, long blockSize, Progressable progress,
- int bytesPerChecksum) throws IOException {
+ ChecksumOpt checksumOpt) throws IOException {
statistics.incrementReadOps(1);
return new FSDataOutputStream(dfs.primitiveCreate(getPathName(f),
absolutePermission, flag, true, replication, blockSize,
- progress, bufferSize, bytesPerChecksum),statistics);
+ progress, bufferSize, checksumOpt), statistics);
}
/**
@@ -242,7 +251,8 @@ public class DistributedFileSystem exten
flag.add(CreateFlag.CREATE);
}
return new FSDataOutputStream(dfs.create(getPathName(f), permission, flag,
- false, replication, blockSize, progress, bufferSize), statistics);
+ false, replication, blockSize, progress,
+ bufferSize, null), statistics);
}
@Override
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1375380&r1=1375379&r2=1375380&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Tue Aug 21 03:29:45 2012
@@ -194,7 +194,7 @@ public class DatanodeWebHdfsMethods {
fullpath, permission.getFsPermission(),
overwrite.getValue() ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
: EnumSet.of(CreateFlag.CREATE),
- replication.getValue(conf), blockSize.getValue(conf), null, b), null);
+ replication.getValue(conf), blockSize.getValue(conf), null, b, null), null);
IOUtils.copyBytes(in, out, b);
out.close();
out = null;
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1375380&r1=1375379&r2=1375380&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Aug 21 03:29:45 2012
@@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
@@ -169,6 +171,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
@@ -485,13 +488,24 @@ public class FSNamesystem implements Nam
DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT);
this.defaultPermission = PermissionStatus.createImmutable(
fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission));
+
+ // Get the checksum type from config
+ String checksumTypeStr = conf.get(DFS_CHECKSUM_TYPE_KEY, DFS_CHECKSUM_TYPE_DEFAULT);
+ DataChecksum.Type checksumType;
+ try {
+ checksumType = DataChecksum.Type.valueOf(checksumTypeStr);
+ } catch (IllegalArgumentException iae) {
+ throw new IOException("Invalid checksum type in "
+ + DFS_CHECKSUM_TYPE_KEY + ": " + checksumTypeStr);
+ }
this.serverDefaults = new FsServerDefaults(
conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
(short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
- conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT));
+ conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
+ checksumType);
this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY,
DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1375380&r1=1375379&r2=1375380&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Tue Aug 21 03:29:45 2012
@@ -27,18 +27,22 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
+import java.util.EnumSet;
import java.util.Random;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
+import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level;
import org.junit.Test;
@@ -568,4 +572,54 @@ public class TestDistributedFileSystem {
testDFSClient();
testFileChecksum();
}
+
+ @Test
+ public void testCreateWithCustomChecksum() throws Exception {
+ Configuration conf = getTestConfiguration();
+ final long grace = 1000L;
+ MiniDFSCluster cluster = null;
+ Path testBasePath = new Path("/test/csum");
+ // create args
+ Path path1 = new Path(testBasePath, "file_wtih_crc1");
+ Path path2 = new Path(testBasePath, "file_with_crc2");
+ ChecksumOpt opt1 = new ChecksumOpt(DataChecksum.Type.CRC32C, 512);
+ ChecksumOpt opt2 = new ChecksumOpt(DataChecksum.Type.CRC32, 512);
+
+ // common args
+ FsPermission perm = FsPermission.getDefault().applyUMask(
+ FsPermission.getUMask(conf));
+ EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.OVERWRITE,
+ CreateFlag.CREATE);
+ short repl = 1;
+
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ FileSystem dfs = cluster.getFileSystem();
+
+ dfs.mkdirs(testBasePath);
+
+ // create two files with different checksum types
+ FSDataOutputStream out1 = dfs.create(path1, perm, flags, 4096, repl,
+ 131072L, null, opt1);
+ FSDataOutputStream out2 = dfs.create(path2, perm, flags, 4096, repl,
+ 131072L, null, opt2);
+
+ for (int i = 0; i < 1024; i++) {
+ out1.write(i);
+ out2.write(i);
+ }
+ out1.close();
+ out2.close();
+
+ // the two checksums must be different.
+ FileChecksum sum1 = dfs.getFileChecksum(path1);
+ FileChecksum sum2 = dfs.getFileChecksum(path2);
+ assertFalse(sum1.equals(sum2));
+ } finally {
+ if (cluster != null) {
+ cluster.getFileSystem().delete(testBasePath, true);
+ cluster.shutdown();
+ }
+ }
+ }
}