You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/01/29 19:05:53 UTC
[10/34] hadoop git commit: HDFS-3689. Add support for variable length
block. Contributed by Jing Zhao.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index a3ac455..38fc637 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -633,15 +633,16 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // ClientProtocol
- public LastBlockWithStatus append(String src, String clientName)
- throws IOException {
+ public LastBlockWithStatus append(String src, String clientName,
+ EnumSetWritable<CreateFlag> flag) throws IOException {
checkNNStartup();
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.append: file "
+src+" for "+clientName+" at "+clientMachine);
}
- CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
+ CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+ null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (LastBlockWithStatus) cacheEntry.getPayload();
}
@@ -649,7 +650,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
LastBlockWithStatus info = null;
boolean success = false;
try {
- info = namesystem.appendFile(src, clientName, clientMachine,
+ info = namesystem.appendFile(src, clientName, clientMachine, flag.get(),
cacheEntry != null);
success = true;
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
index 5c9f752..34564d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
@@ -66,6 +66,7 @@ enum CreateFlagProto {
OVERWRITE = 0x02; // Truncate/overwrite a file. Same as POSIX O_TRUNC
APPEND = 0x04; // Append to a file
LAZY_PERSIST = 0x10; // File with reduced durability guarantees.
+ NEW_BLOCK = 0x20; // Write data to a new block when appending
}
message CreateRequestProto {
@@ -86,6 +87,7 @@ message CreateResponseProto {
message AppendRequestProto {
required string src = 1;
required string clientName = 2;
+ optional uint32 flag = 3; // bits set using CreateFlag
}
message AppendResponseProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
index e50f14b..5b78fe6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
@@ -89,6 +89,7 @@ message CloseEventProto {
message AppendEventProto {
required string path = 1;
+ optional bool newBlock = 2 [default = false];
}
message RenameEventProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
index eab44be..68a85b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
@@ -136,6 +136,22 @@ public class AppendTestUtil {
}
}
+ public static void check(DistributedFileSystem fs, Path p, int position,
+ int length) throws IOException {
+ byte[] buf = new byte[length];
+ int i = 0;
+ try {
+ FSDataInputStream in = fs.open(p);
+ in.read(position, buf, 0, buf.length);
+ for(i = position; i < length + position; i++) {
+ assertEquals((byte) i, buf[i - position]);
+ }
+ in.close();
+ } catch(IOException ioe) {
+ throw new IOException("p=" + p + ", length=" + length + ", i=" + i, ioe);
+ }
+ }
+
/**
* create a buffer that contains the entire test file data.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 0eef46f..126827a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1132,6 +1132,9 @@ public class DFSTestUtil {
FSDataOutputStream s = filesystem.create(pathFileCreate);
// OP_CLOSE 9
s.close();
+ // OP_APPEND 47
+ FSDataOutputStream s2 = filesystem.append(pathFileCreate, 4096, null);
+ s2.close();
// OP_SET_STORAGE_POLICY 45
filesystem.setStoragePolicy(pathFileCreate,
HdfsConstants.HOT_STORAGE_POLICY_NAME);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
index 75a4ad4..4f449d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.XAttrSetFlag;
@@ -71,7 +72,7 @@ public class TestDFSInotifyEventInputStream {
*/
@Test
public void testOpcodeCount() {
- Assert.assertEquals(48, FSEditLogOpCodes.values().length);
+ Assert.assertEquals(49, FSEditLogOpCodes.values().length);
}
@@ -109,7 +110,8 @@ public class TestDFSInotifyEventInputStream {
os.write(new byte[BLOCK_SIZE]);
os.close(); // CloseOp -> CloseEvent
// AddOp -> AppendEvent
- os = client.append("/file2", BLOCK_SIZE, null, null);
+ os = client.append("/file2", BLOCK_SIZE, EnumSet.of(CreateFlag.APPEND),
+ null, null);
os.write(new byte[BLOCK_SIZE]);
os.close(); // CloseOp -> CloseEvent
Thread.sleep(10); // so that the atime will get updated on the next line
@@ -182,13 +184,14 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(ce2.getFileSize() > 0);
Assert.assertTrue(ce2.getTimestamp() > 0);
- // AddOp
+ // AppendOp
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND);
Event.AppendEvent append2 = (Event.AppendEvent)batch.getEvents()[0];
Assert.assertEquals("/file2", append2.getPath());
+ Assert.assertFalse(append2.toNewBlock());
// CloseOp
batch = waitForNextEvents(eis);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
index 34c701d..3cb72ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
@@ -25,10 +25,12 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HardLink;
@@ -344,7 +346,46 @@ public class TestFileAppend{
cluster.shutdown();
}
}
+
+ /** Test two consecutive appends on a file with a full block. */
+ @Test
+ public void testAppend2Twice() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ final DistributedFileSystem fs1 = cluster.getFileSystem();
+ final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf);
+ try {
+ final Path p = new Path("/testAppendTwice/foo");
+ final int len = 1 << 16;
+ final byte[] fileContents = AppendTestUtil.initBuffer(len);
+
+ {
+ // create a new file with a full block.
+ FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len);
+ out.write(fileContents, 0, len);
+ out.close();
+ }
+ //1st append does not add any data so that the last block remains full
+ //and the last block in INodeFileUnderConstruction is a BlockInfo
+ //but not BlockInfoUnderConstruction.
+ ((DistributedFileSystem) fs2).append(p,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+
+ // 2nd append should get AlreadyBeingCreatedException
+ fs1.append(p);
+ Assert.fail();
+ } catch(RemoteException re) {
+ AppendTestUtil.LOG.info("Got an exception:", re);
+ Assert.assertEquals(AlreadyBeingCreatedException.class.getName(),
+ re.getClassName());
+ } finally {
+ fs2.close();
+ fs1.close();
+ cluster.shutdown();
+ }
+ }
+
/** Tests appending after soft-limit expires. */
@Test
public void testAppendAfterSoftLimit()
@@ -386,6 +427,54 @@ public class TestFileAppend{
}
}
+ /** Tests appending after soft-limit expires. */
+ @Test
+ public void testAppend2AfterSoftLimit() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+ //Set small soft-limit for lease
+ final long softLimit = 1L;
+ final long hardLimit = 9999999L;
+
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+ .build();
+ cluster.setLeasePeriod(softLimit, hardLimit);
+ cluster.waitActive();
+
+ DistributedFileSystem fs = cluster.getFileSystem();
+ DistributedFileSystem fs2 = new DistributedFileSystem();
+ fs2.initialize(fs.getUri(), conf);
+
+ final Path testPath = new Path("/testAppendAfterSoftLimit");
+ final byte[] fileContents = AppendTestUtil.initBuffer(32);
+
+ // create a new file without closing
+ FSDataOutputStream out = fs.create(testPath);
+ out.write(fileContents);
+
+ //Wait for > soft-limit
+ Thread.sleep(250);
+
+ try {
+ FSDataOutputStream appendStream2 = fs2.append(testPath,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ appendStream2.write(fileContents);
+ appendStream2.close();
+ assertEquals(fileContents.length, fs.getFileStatus(testPath).getLen());
+ // make sure we now have 1 block since the first writer was revoked
+ LocatedBlocks blks = fs.getClient().getLocatedBlocks(testPath.toString(),
+ 0L);
+ assertEquals(1, blks.getLocatedBlocks().size());
+ for (LocatedBlock blk : blks.getLocatedBlocks()) {
+ assertEquals(fileContents.length, blk.getBlockSize());
+ }
+ } finally {
+ fs.close();
+ fs2.close();
+ cluster.shutdown();
+ }
+ }
+
/**
* Old replica of the block should not be accepted as valid for append/read
*/
@@ -439,4 +528,77 @@ public class TestFileAppend{
}
}
+ /**
+ * Old replica of the block should not be accepted as valid for append/read
+ */
+ @Test
+ public void testMultiAppend2() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.set("dfs.client.block.write.replace-datanode-on-failure.enable",
+ "false");
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+ .build();
+ DistributedFileSystem fs = null;
+ final String hello = "hello\n";
+ try {
+ fs = cluster.getFileSystem();
+ Path path = new Path("/test");
+ FSDataOutputStream out = fs.create(path);
+ out.writeBytes(hello);
+ out.close();
+
+ // stop one datanode
+ DataNodeProperties dnProp = cluster.stopDataNode(0);
+ String dnAddress = dnProp.datanode.getXferAddress().toString();
+ if (dnAddress.startsWith("/")) {
+ dnAddress = dnAddress.substring(1);
+ }
+
+ // append again to bump genstamps
+ for (int i = 0; i < 2; i++) {
+ out = fs.append(path,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ out.writeBytes(hello);
+ out.close();
+ }
+
+ // re-open and make the block state as underconstruction
+ out = fs.append(path, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK),
+ 4096, null);
+ cluster.restartDataNode(dnProp, true);
+ // wait till the block report comes
+ Thread.sleep(2000);
+ out.writeBytes(hello);
+ out.close();
+ // check the block locations
+ LocatedBlocks blocks = fs.getClient().getLocatedBlocks(path.toString(), 0L);
+ // since we append the file 3 time, we should be 4 blocks
+ assertEquals(4, blocks.getLocatedBlocks().size());
+ for (LocatedBlock block : blocks.getLocatedBlocks()) {
+ assertEquals(hello.length(), block.getBlockSize());
+ }
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 4; i++) {
+ sb.append(hello);
+ }
+ final byte[] content = sb.toString().getBytes();
+ AppendTestUtil.checkFullFile(fs, path, content.length, content,
+ "Read /test");
+
+ // restart namenode to make sure the editlog can be properly applied
+ cluster.restartNameNode(true);
+ cluster.waitActive();
+ AppendTestUtil.checkFullFile(fs, path, content.length, content,
+ "Read /test");
+ blocks = fs.getClient().getLocatedBlocks(path.toString(), 0L);
+ // since we append the file 3 time, we should be 4 blocks
+ assertEquals(4, blocks.getLocatedBlocks().size());
+ for (LocatedBlock block : blocks.getLocatedBlocks()) {
+ assertEquals(hello.length(), block.getBlockSize());
+ }
+ } finally {
+ IOUtils.closeStream(fs);
+ cluster.shutdown();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java
index eecd23b..99d04dc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -24,14 +25,18 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -67,11 +72,7 @@ public class TestFileAppend2 {
final int numberOfFiles = 50;
final int numThreads = 10;
final int numAppendsPerThread = 20;
-/***
- int numberOfFiles = 1;
- int numThreads = 1;
- int numAppendsPerThread = 2000;
-****/
+
Workload[] workload = null;
final ArrayList<Path> testFiles = new ArrayList<Path>();
volatile static boolean globalStatus = true;
@@ -229,16 +230,170 @@ public class TestFileAppend2 {
}
}
+ /**
+ * Creates one file, writes a few bytes to it and then closed it.
+ * Reopens the same file for appending using append2 API, write all blocks and
+ * then close. Verify that all data exists in file.
+ */
+ @Test
+ public void testSimpleAppend2() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ if (simulatedStorage) {
+ SimulatedFSDataset.setFactory(conf);
+ }
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 50);
+ fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ try {
+ { // test appending to a file.
+ // create a new file.
+ Path file1 = new Path("/simpleAppend.dat");
+ FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1);
+ System.out.println("Created file simpleAppend.dat");
+
+ // write to file
+ int mid = 186; // io.bytes.per.checksum bytes
+ System.out.println("Writing " + mid + " bytes to file " + file1);
+ stm.write(fileContents, 0, mid);
+ stm.close();
+ System.out.println("Wrote and Closed first part of file.");
+
+ // write to file
+ int mid2 = 607; // io.bytes.per.checksum bytes
+ System.out.println("Writing " + mid + " bytes to file " + file1);
+ stm = fs.append(file1,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ stm.write(fileContents, mid, mid2-mid);
+ stm.close();
+ System.out.println("Wrote and Closed second part of file.");
+
+ // write the remainder of the file
+ stm = fs.append(file1,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ // ensure getPos is set to reflect existing size of the file
+ assertTrue(stm.getPos() > 0);
+ System.out.println("Writing " + (AppendTestUtil.FILE_SIZE - mid2) +
+ " bytes to file " + file1);
+ stm.write(fileContents, mid2, AppendTestUtil.FILE_SIZE - mid2);
+ System.out.println("Written second part of file");
+ stm.close();
+ System.out.println("Wrote and Closed second part of file.");
+
+ // verify that entire file is good
+ AppendTestUtil.checkFullFile(fs, file1, AppendTestUtil.FILE_SIZE,
+ fileContents, "Read 2");
+ // also make sure there three different blocks for the file
+ List<LocatedBlock> blocks = fs.getClient().getLocatedBlocks(
+ file1.toString(), 0L).getLocatedBlocks();
+ assertEquals(12, blocks.size()); // the block size is 1024
+ assertEquals(mid, blocks.get(0).getBlockSize());
+ assertEquals(mid2 - mid, blocks.get(1).getBlockSize());
+ for (int i = 2; i < 11; i++) {
+ assertEquals(AppendTestUtil.BLOCK_SIZE, blocks.get(i).getBlockSize());
+ }
+ assertEquals((AppendTestUtil.FILE_SIZE - mid2)
+ % AppendTestUtil.BLOCK_SIZE, blocks.get(11).getBlockSize());
+ }
+
+ { // test appending to an non-existing file.
+ FSDataOutputStream out = null;
+ try {
+ out = fs.append(new Path("/non-existing.dat"),
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ fail("Expected to have FileNotFoundException");
+ } catch(java.io.FileNotFoundException fnfe) {
+ System.out.println("Good: got " + fnfe);
+ fnfe.printStackTrace(System.out);
+ } finally {
+ IOUtils.closeStream(out);
+ }
+ }
+
+ { // test append permission.
+ // set root to all writable
+ Path root = new Path("/");
+ fs.setPermission(root, new FsPermission((short)0777));
+ fs.close();
+
+ // login as a different user
+ final UserGroupInformation superuser =
+ UserGroupInformation.getCurrentUser();
+ String username = "testappenduser";
+ String group = "testappendgroup";
+ assertFalse(superuser.getShortUserName().equals(username));
+ assertFalse(Arrays.asList(superuser.getGroupNames()).contains(group));
+ UserGroupInformation appenduser = UserGroupInformation
+ .createUserForTesting(username, new String[] { group });
+
+ fs = (DistributedFileSystem) DFSTestUtil.getFileSystemAs(appenduser,
+ conf);
+
+ // create a file
+ Path dir = new Path(root, getClass().getSimpleName());
+ Path foo = new Path(dir, "foo.dat");
+ FSDataOutputStream out = null;
+ int offset = 0;
+ try {
+ out = fs.create(foo);
+ int len = 10 + AppendTestUtil.nextInt(100);
+ out.write(fileContents, offset, len);
+ offset += len;
+ } finally {
+ IOUtils.closeStream(out);
+ }
+
+ // change dir and foo to minimal permissions.
+ fs.setPermission(dir, new FsPermission((short)0100));
+ fs.setPermission(foo, new FsPermission((short)0200));
+
+ // try append, should success
+ out = null;
+ try {
+ out = fs.append(foo,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ int len = 10 + AppendTestUtil.nextInt(100);
+ out.write(fileContents, offset, len);
+ offset += len;
+ } finally {
+ IOUtils.closeStream(out);
+ }
+
+ // change dir and foo to all but no write on foo.
+ fs.setPermission(foo, new FsPermission((short)0577));
+ fs.setPermission(dir, new FsPermission((short)0777));
+
+ // try append, should fail
+ out = null;
+ try {
+ out = fs.append(foo,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ fail("Expected to have AccessControlException");
+ } catch(AccessControlException ace) {
+ System.out.println("Good: got " + ace);
+ ace.printStackTrace(System.out);
+ } finally {
+ IOUtils.closeStream(out);
+ }
+ }
+ } finally {
+ fs.close();
+ cluster.shutdown();
+ }
+ }
+
//
// an object that does a bunch of appends to files
//
class Workload extends Thread {
private final int id;
private final MiniDFSCluster cluster;
+ private final boolean appendToNewBlock;
- Workload(MiniDFSCluster cluster, int threadIndex) {
+ Workload(MiniDFSCluster cluster, int threadIndex, boolean append2) {
id = threadIndex;
this.cluster = cluster;
+ this.appendToNewBlock = append2;
}
// create a bunch of files. Write to them and then verify.
@@ -261,7 +416,7 @@ public class TestFileAppend2 {
long len = 0;
int sizeToAppend = 0;
try {
- FileSystem fs = cluster.getFileSystem();
+ DistributedFileSystem fs = cluster.getFileSystem();
// add a random number of bytes to file
len = fs.getFileStatus(testfile).getLen();
@@ -285,7 +440,9 @@ public class TestFileAppend2 {
" appending " + sizeToAppend + " bytes " +
" to file " + testfile +
" of size " + len);
- FSDataOutputStream stm = fs.append(testfile);
+ FSDataOutputStream stm = appendToNewBlock ? fs.append(testfile,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null)
+ : fs.append(testfile);
stm.write(fileContents, (int)len, sizeToAppend);
stm.close();
@@ -298,7 +455,7 @@ public class TestFileAppend2 {
" expected size " + (len + sizeToAppend) +
" waiting for namenode metadata update.");
Thread.sleep(5000);
- } catch (InterruptedException e) {;}
+ } catch (InterruptedException e) {}
}
assertTrue("File " + testfile + " size is " +
@@ -306,7 +463,7 @@ public class TestFileAppend2 {
" but expected " + (len + sizeToAppend),
fs.getFileStatus(testfile).getLen() == (len + sizeToAppend));
- AppendTestUtil.checkFullFile(fs, testfile, (int)(len + sizeToAppend),
+ AppendTestUtil.checkFullFile(fs, testfile, (int) (len + sizeToAppend),
fileContents, "Read 2");
} catch (Throwable e) {
globalStatus = false;
@@ -331,10 +488,8 @@ public class TestFileAppend2 {
/**
* Test that appends to files at random offsets.
- * @throws IOException an exception might be thrown
*/
- @Test
- public void testComplexAppend() throws IOException {
+ private void testComplexAppend(boolean appendToNewBlock) throws IOException {
fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
@@ -366,7 +521,7 @@ public class TestFileAppend2 {
// Create threads and make them run workload concurrently.
workload = new Workload[numThreads];
for (int i = 0; i < numThreads; i++) {
- workload[i] = new Workload(cluster, i);
+ workload[i] = new Workload(cluster, i, appendToNewBlock);
workload[i].start();
}
@@ -390,4 +545,14 @@ public class TestFileAppend2 {
//
assertTrue("testComplexAppend Worker encountered exceptions.", globalStatus);
}
+
+ @Test
+ public void testComplexAppend() throws IOException {
+ testComplexAppend(false);
+ }
+
+ @Test
+ public void testComplexAppend2() throws IOException {
+ testComplexAppend(true);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java
index d5de0ff..9ebe115 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java
@@ -24,7 +24,10 @@ import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.util.EnumSet;
+import java.util.List;
+import org.apache.hadoop.fs.CreateFlag;
import org.mockito.invocation.InvocationOnMock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -36,8 +39,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.DFSClientAdapter;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.log4j.Level;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -121,6 +123,32 @@ public class TestFileAppend3 {
AppendTestUtil.check(fs, p, len1 + len2);
}
+ @Test
+ public void testTC1ForAppend2() throws Exception {
+ final Path p = new Path("/TC1/foo2");
+
+ //a. Create file and write one block of data. Close file.
+ final int len1 = (int) BLOCK_SIZE;
+ {
+ FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
+ BLOCK_SIZE);
+ AppendTestUtil.write(out, 0, len1);
+ out.close();
+ }
+
+ // Reopen file to append. Append half block of data. Close file.
+ final int len2 = (int) BLOCK_SIZE / 2;
+ {
+ FSDataOutputStream out = fs.append(p,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ AppendTestUtil.write(out, len1, len2);
+ out.close();
+ }
+
+ // b. Reopen file and read 1.5 blocks worth of data. Close file.
+ AppendTestUtil.check(fs, p, len1 + len2);
+ }
+
/**
* TC2: Append on non-block boundary.
* @throws IOException an exception might be thrown
@@ -152,6 +180,40 @@ public class TestFileAppend3 {
AppendTestUtil.check(fs, p, len1 + len2);
}
+ @Test
+ public void testTC2ForAppend2() throws Exception {
+ final Path p = new Path("/TC2/foo2");
+
+ //a. Create file with one and a half block of data. Close file.
+ final int len1 = (int) (BLOCK_SIZE + BLOCK_SIZE / 2);
+ {
+ FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
+ BLOCK_SIZE);
+ AppendTestUtil.write(out, 0, len1);
+ out.close();
+ }
+
+ AppendTestUtil.check(fs, p, len1);
+
+ // Reopen file to append quarter block of data. Close file.
+ final int len2 = (int) BLOCK_SIZE / 4;
+ {
+ FSDataOutputStream out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK),
+ 4096, null);
+ AppendTestUtil.write(out, len1, len2);
+ out.close();
+ }
+
+ // b. Reopen file and read 1.75 blocks of data. Close file.
+ AppendTestUtil.check(fs, p, len1 + len2);
+ List<LocatedBlock> blocks = fs.getClient().getLocatedBlocks(
+ p.toString(), 0L).getLocatedBlocks();
+ Assert.assertEquals(3, blocks.size());
+ Assert.assertEquals(BLOCK_SIZE, blocks.get(0).getBlockSize());
+ Assert.assertEquals(BLOCK_SIZE / 2, blocks.get(1).getBlockSize());
+ Assert.assertEquals(BLOCK_SIZE / 4, blocks.get(2).getBlockSize());
+ }
+
/**
* TC5: Only one simultaneous append.
* @throws IOException an exception might be thrown
@@ -179,18 +241,63 @@ public class TestFileAppend3 {
AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
}
+ try {
+ ((DistributedFileSystem) AppendTestUtil
+ .createHdfsWithDifferentUsername(conf)).append(p,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ fail("This should fail.");
+ } catch(IOException ioe) {
+ AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
+ }
+
//d. On Machine M1, close file.
out.close();
}
+ @Test
+ public void testTC5ForAppend2() throws Exception {
+ final Path p = new Path("/TC5/foo2");
+
+ // a. Create file on Machine M1. Write half block to it. Close file.
+ {
+ FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
+ BLOCK_SIZE);
+ AppendTestUtil.write(out, 0, (int)(BLOCK_SIZE/2));
+ out.close();
+ }
+
+ // b. Reopen file in "append" mode on Machine M1.
+ FSDataOutputStream out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK),
+ 4096, null);
+
+ // c. On Machine M2, reopen file in "append" mode. This should fail.
+ try {
+ ((DistributedFileSystem) AppendTestUtil
+ .createHdfsWithDifferentUsername(conf)).append(p,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ fail("This should fail.");
+ } catch(IOException ioe) {
+ AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
+ }
+
+ try {
+ AppendTestUtil.createHdfsWithDifferentUsername(conf).append(p);
+ fail("This should fail.");
+ } catch(IOException ioe) {
+ AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
+ }
+
+ // d. On Machine M1, close file.
+ out.close();
+ }
+
/**
* TC7: Corrupted replicas are present.
* @throws IOException an exception might be thrown
*/
- @Test
- public void testTC7() throws Exception {
+ private void testTC7(boolean appendToNewBlock) throws Exception {
final short repl = 2;
- final Path p = new Path("/TC7/foo");
+ final Path p = new Path("/TC7/foo" + (appendToNewBlock ? "0" : "1"));
System.out.println("p=" + p);
//a. Create file with replication factor of 2. Write half block of data. Close file.
@@ -224,7 +331,8 @@ public class TestFileAppend3 {
//c. Open file in "append mode". Append a new block worth of data. Close file.
final int len2 = (int)BLOCK_SIZE;
{
- FSDataOutputStream out = fs.append(p);
+ FSDataOutputStream out = appendToNewBlock ?
+ fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) : fs.append(p);
AppendTestUtil.write(out, len1, len2);
out.close();
}
@@ -233,13 +341,21 @@ public class TestFileAppend3 {
AppendTestUtil.check(fs, p, len1 + len2);
}
+ @Test
+ public void testTC7() throws Exception {
+ testTC7(false);
+ }
+
+ @Test
+ public void testTC7ForAppend2() throws Exception {
+ testTC7(true);
+ }
+
/**
* TC11: Racing rename
- * @throws IOException an exception might be thrown
*/
- @Test
- public void testTC11() throws Exception {
- final Path p = new Path("/TC11/foo");
+ private void testTC11(boolean appendToNewBlock) throws Exception {
+ final Path p = new Path("/TC11/foo" + (appendToNewBlock ? "0" : "1"));
System.out.println("p=" + p);
//a. Create file and write one block of data. Close file.
@@ -251,7 +367,9 @@ public class TestFileAppend3 {
}
//b. Reopen file in "append" mode. Append half block of data.
- FSDataOutputStream out = fs.append(p);
+ FSDataOutputStream out = appendToNewBlock ?
+ fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
+ fs.append(p);
final int len2 = (int)BLOCK_SIZE/2;
AppendTestUtil.write(out, len1, len2);
out.hflush();
@@ -283,13 +401,21 @@ public class TestFileAppend3 {
}
}
+ @Test
+ public void testTC11() throws Exception {
+ testTC11(false);
+ }
+
+ @Test
+ public void testTC11ForAppend2() throws Exception {
+ testTC11(true);
+ }
+
/**
* TC12: Append to partial CRC chunk
- * @throws IOException an exception might be thrown
*/
- @Test
- public void testTC12() throws Exception {
- final Path p = new Path("/TC12/foo");
+ private void testTC12(boolean appendToNewBlock) throws Exception {
+ final Path p = new Path("/TC12/foo" + (appendToNewBlock ? "0" : "1"));
System.out.println("p=" + p);
//a. Create file with a block size of 64KB
@@ -305,23 +431,43 @@ public class TestFileAppend3 {
//b. Reopen file in "append" mode. Append another 5877 bytes of data. Close file.
final int len2 = 5877;
{
- FSDataOutputStream out = fs.append(p);
+ FSDataOutputStream out = appendToNewBlock ?
+ fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
+ fs.append(p);
AppendTestUtil.write(out, len1, len2);
out.close();
}
//c. Reopen file and read 25687+5877 bytes of data from file. Close file.
AppendTestUtil.check(fs, p, len1 + len2);
+ if (appendToNewBlock) {
+ LocatedBlocks blks = fs.dfs.getLocatedBlocks(p.toString(), 0);
+ Assert.assertEquals(2, blks.getLocatedBlocks().size());
+ Assert.assertEquals(len1, blks.getLocatedBlocks().get(0).getBlockSize());
+ Assert.assertEquals(len2, blks.getLocatedBlocks().get(1).getBlockSize());
+ AppendTestUtil.check(fs, p, 0, len1);
+ AppendTestUtil.check(fs, p, len1, len2);
+ }
}
-
- /** Append to a partial CRC chunk and
- * the first write does not fill up the partial CRC trunk
- * *
- * @throws IOException
- */
+
@Test
- public void testAppendToPartialChunk() throws IOException {
- final Path p = new Path("/partialChunk/foo");
+ public void testTC12() throws Exception {
+ testTC12(false);
+ }
+
+ @Test
+ public void testTC12ForAppend2() throws Exception {
+ testTC12(true);
+ }
+
+ /**
+ * Append to a partial CRC chunk and the first write does not fill up the
+ * partial CRC trunk
+ */
+ private void testAppendToPartialChunk(boolean appendToNewBlock)
+ throws IOException {
+ final Path p = new Path("/partialChunk/foo"
+ + (appendToNewBlock ? "0" : "1"));
final int fileLen = 513;
System.out.println("p=" + p);
@@ -336,7 +482,9 @@ public class TestFileAppend3 {
System.out.println("Wrote 1 byte and closed the file " + p);
// append to file
- stm = fs.append(p);
+ stm = appendToNewBlock ?
+ fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
+ fs.append(p);
// Append to a partial CRC trunk
stm.write(fileContents, 1, 1);
stm.hflush();
@@ -345,7 +493,9 @@ public class TestFileAppend3 {
System.out.println("Append 1 byte and closed the file " + p);
// write the remainder of the file
- stm = fs.append(p);
+ stm = appendToNewBlock ?
+ fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
+ fs.append(p);
// ensure getPos is set to reflect existing size of the file
assertEquals(2, stm.getPos());
@@ -444,4 +594,14 @@ public class TestFileAppend3 {
// if append was called with a stale file stat.
doSmallAppends(file, fs, 20);
}
+
+ @Test
+ public void testAppendToPartialChunk() throws IOException {
+ testAppendToPartialChunk(false);
+ }
+
+ @Test
+ public void testAppendToPartialChunkforAppend2() throws IOException {
+ testAppendToPartialChunk(true);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java
index 0bca23d..a2b344c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java
@@ -99,10 +99,11 @@ public class TestFileAppendRestart {
// OP_ADD to create file
// OP_ADD_BLOCK for first block
// OP_CLOSE to close file
- // OP_ADD to reopen file
+ // OP_APPEND to reopen file
// OP_ADD_BLOCK for second block
// OP_CLOSE to close file
- assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_ADD).held);
+ assertEquals(1, (int)counts.get(FSEditLogOpCodes.OP_ADD).held);
+ assertEquals(1, (int)counts.get(FSEditLogOpCodes.OP_APPEND).held);
assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_ADD_BLOCK).held);
assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_CLOSE).held);
@@ -112,13 +113,14 @@ public class TestFileAppendRestart {
// OP_ADD to create file
// OP_ADD_BLOCK for first block
// OP_CLOSE to close file
- // OP_ADD to re-establish the lease
+ // OP_APPEND to re-establish the lease
// OP_UPDATE_BLOCKS from the updatePipeline call (increments genstamp of last block)
// OP_ADD_BLOCK at the start of the second block
// OP_CLOSE to close file
// Total: 2 OP_ADDs, 1 OP_UPDATE_BLOCKS, 2 OP_ADD_BLOCKs, and 2 OP_CLOSEs
// in addition to the ones above
- assertEquals(2+2, (int)counts.get(FSEditLogOpCodes.OP_ADD).held);
+ assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_ADD).held);
+ assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_APPEND).held);
assertEquals(1, (int)counts.get(FSEditLogOpCodes.OP_UPDATE_BLOCKS).held);
assertEquals(2+2, (int)counts.get(FSEditLogOpCodes.OP_ADD_BLOCK).held);
assertEquals(2+2, (int)counts.get(FSEditLogOpCodes.OP_CLOSE).held);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
index 9ada95f..6bcfa71 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
@@ -31,7 +31,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level;
import org.junit.Test;
@@ -121,7 +123,66 @@ public class TestHFlush {
cluster.shutdown();
}
}
-
+
+ /**
+ * Test hsync with END_BLOCK flag.
+ */
+ @Test
+ public void hSyncEndBlock_00() throws IOException {
+ final int preferredBlockSize = 1024;
+ Configuration conf = new HdfsConfiguration();
+ conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, preferredBlockSize);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+ .build();
+ DistributedFileSystem fileSystem = cluster.getFileSystem();
+ FSDataOutputStream stm = null;
+ try {
+ Path path = new Path("/" + fName);
+ stm = fileSystem.create(path, true, 4096, (short) 2,
+ AppendTestUtil.BLOCK_SIZE);
+ System.out.println("Created file " + path.toString());
+ ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
+ .of(SyncFlag.END_BLOCK));
+ long currentFileLength = fileSystem.getFileStatus(path).getLen();
+ assertEquals(0L, currentFileLength);
+ LocatedBlocks blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
+ assertEquals(0, blocks.getLocatedBlocks().size());
+
+ // write a block and call hsync(end_block) at the block boundary
+ stm.write(new byte[preferredBlockSize]);
+ ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
+ .of(SyncFlag.END_BLOCK));
+ currentFileLength = fileSystem.getFileStatus(path).getLen();
+ assertEquals(preferredBlockSize, currentFileLength);
+ blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
+ assertEquals(1, blocks.getLocatedBlocks().size());
+
+ // call hsync then call hsync(end_block) immediately
+ stm.write(new byte[preferredBlockSize / 2]);
+ stm.hsync();
+ ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
+ .of(SyncFlag.END_BLOCK));
+ currentFileLength = fileSystem.getFileStatus(path).getLen();
+ assertEquals(preferredBlockSize + preferredBlockSize / 2,
+ currentFileLength);
+ blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
+ assertEquals(2, blocks.getLocatedBlocks().size());
+
+ stm.write(new byte[preferredBlockSize / 4]);
+ stm.hsync();
+ currentFileLength = fileSystem.getFileStatus(path).getLen();
+ assertEquals(preferredBlockSize + preferredBlockSize / 2
+ + preferredBlockSize / 4, currentFileLength);
+ blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
+ assertEquals(3, blocks.getLocatedBlocks().size());
+ } finally {
+ IOUtils.cleanup(null, stm, fileSystem);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
@@ -136,6 +197,29 @@ public class TestHFlush {
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
+ * while requiring the semantic of {@link SyncFlag#END_BLOCK}.
+ */
+ @Test
+ public void hSyncEndBlock_01() throws IOException {
+ doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
+ (short) 2, true, EnumSet.of(SyncFlag.END_BLOCK));
+ }
+
+ /**
+ * The test calls
+ * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
+ * while requiring the semantic of {@link SyncFlag#END_BLOCK} and
+ * {@link SyncFlag#UPDATE_LENGTH}.
+ */
+ @Test
+ public void hSyncEndBlockAndUpdateLength() throws IOException {
+ doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
+ (short) 2, true, EnumSet.of(SyncFlag.END_BLOCK, SyncFlag.UPDATE_LENGTH));
+ }
+
+ /**
+ * The test calls
+ * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
* Similar with {@link #hFlush_02()} , it writes a file with a custom block
* size so the writes will be happening across block' boundaries
@@ -152,7 +236,20 @@ public class TestHFlush {
doTheJob(conf, fName, customBlockSize, (short) 2, true,
EnumSet.of(SyncFlag.UPDATE_LENGTH));
}
-
+
+ @Test
+ public void hSyncEndBlock_02() throws IOException {
+ Configuration conf = new HdfsConfiguration();
+ int customPerChecksumSize = 512;
+ int customBlockSize = customPerChecksumSize * 3;
+ // Modify defaul filesystem settings
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
+
+ doTheJob(conf, fName, customBlockSize, (short) 2, true,
+ EnumSet.of(SyncFlag.END_BLOCK));
+ }
+
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
@@ -173,7 +270,20 @@ public class TestHFlush {
doTheJob(conf, fName, customBlockSize, (short) 2, true,
EnumSet.of(SyncFlag.UPDATE_LENGTH));
}
-
+
+ @Test
+ public void hSyncEndBlock_03() throws IOException {
+ Configuration conf = new HdfsConfiguration();
+ int customPerChecksumSize = 400;
+ int customBlockSize = customPerChecksumSize * 3;
+ // Modify defaul filesystem settings
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
+
+ doTheJob(conf, fName, customBlockSize, (short) 2, true,
+ EnumSet.of(SyncFlag.END_BLOCK));
+ }
+
/**
* The method starts new cluster with defined Configuration; creates a file
* with specified block_size and writes 10 equal sections in it; it also calls
@@ -197,12 +307,13 @@ public class TestHFlush {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(replicas).build();
// Make sure we work with DFS in order to utilize all its functionality
- DistributedFileSystem fileSystem =
- cluster.getFileSystem();
+ DistributedFileSystem fileSystem = cluster.getFileSystem();
FSDataInputStream is;
try {
Path path = new Path(fileName);
+ final String pathName = new Path(fileSystem.getWorkingDirectory(), path)
+ .toUri().getPath();
FSDataOutputStream stm = fileSystem.create(path, false, 4096, replicas,
block_size);
System.out.println("Created file " + fileName);
@@ -210,7 +321,8 @@ public class TestHFlush {
int tenth = AppendTestUtil.FILE_SIZE/SECTIONS;
int rounding = AppendTestUtil.FILE_SIZE - tenth * SECTIONS;
for (int i=0; i<SECTIONS; i++) {
- System.out.println("Writing " + (tenth * i) + " to " + (tenth * (i+1)) + " section to file " + fileName);
+ System.out.println("Writing " + (tenth * i) + " to "
+ + (tenth * (i + 1)) + " section to file " + fileName);
// write to the file
stm.write(fileContent, tenth * i, tenth);
@@ -227,7 +339,11 @@ public class TestHFlush {
assertEquals(
"File size doesn't match for hsync/hflush with updating the length",
tenth * (i + 1), currentFileLength);
+ } else if (isSync && syncFlags.contains(SyncFlag.END_BLOCK)) {
+ LocatedBlocks blocks = fileSystem.dfs.getLocatedBlocks(pathName, 0);
+ assertEquals(i + 1, blocks.getLocatedBlocks().size());
}
+
byte [] toRead = new byte[tenth];
byte [] expected = new byte[tenth];
System.arraycopy(fileContent, tenth * i, expected, 0, tenth);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
index b84989f..15580a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
@@ -22,8 +22,10 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Test;
@@ -124,7 +127,8 @@ public class TestLeaseRecovery {
}
DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
- cluster.getNameNodeRpc().append(filestr, dfs.dfs.clientName);
+ cluster.getNameNodeRpc().append(filestr, dfs.dfs.clientName,
+ new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
// expire lease to trigger block recovery.
waitLeaseRecovery(cluster);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 84ac2a5..a4df4ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -28,6 +29,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -234,7 +236,8 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
makeTestFile(path, BLOCK_SIZE, true);
try {
- client.append(path.toString(), BUFFER_LENGTH, null, null).close();
+ client.append(path.toString(), BUFFER_LENGTH,
+ EnumSet.of(CreateFlag.APPEND), null, null).close();
fail("Append to LazyPersist file did not fail as expected");
} catch (Throwable t) {
LOG.info("Got expected exception ", t);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
index 6d1f452..ddf5a3e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
@@ -40,9 +40,12 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -99,7 +102,7 @@ public class TestHDFSConcat {
HdfsFileStatus fStatus;
FSDataInputStream stm;
- String trg = new String("/trg");
+ String trg = "/trg";
Path trgPath = new Path(trg);
DFSTestUtil.createFile(dfs, trgPath, fileLen, REPL_FACTOR, 1);
fStatus = nn.getFileInfo(trg);
@@ -112,7 +115,7 @@ public class TestHDFSConcat {
long [] lens = new long [numFiles];
- int i = 0;
+ int i;
for(i=0; i<files.length; i++) {
files[i] = new Path("/file"+i);
Path path = files[i];
@@ -385,6 +388,75 @@ public class TestHDFSConcat {
} catch (Exception e) {
// exspected
}
-
+ }
+
+ /**
+ * make sure we update the quota correctly after concat
+ */
+ @Test
+ public void testConcatWithQuotaDecrease() throws IOException {
+ final short srcRepl = 3; // note this is different with REPL_FACTOR
+ final int srcNum = 10;
+ final Path foo = new Path("/foo");
+ final Path[] srcs = new Path[srcNum];
+ final Path target = new Path(foo, "target");
+ DFSTestUtil.createFile(dfs, target, blockSize, REPL_FACTOR, 0L);
+
+ dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
+
+ for (int i = 0; i < srcNum; i++) {
+ srcs[i] = new Path(foo, "src" + i);
+ DFSTestUtil.createFile(dfs, srcs[i], blockSize * 2, srcRepl, 0L);
+ }
+
+ ContentSummary summary = dfs.getContentSummary(foo);
+ Assert.assertEquals(11, summary.getFileCount());
+ Assert.assertEquals(blockSize * REPL_FACTOR +
+ blockSize * 2 * srcRepl * srcNum, summary.getSpaceConsumed());
+
+ dfs.concat(target, srcs);
+ summary = dfs.getContentSummary(foo);
+ Assert.assertEquals(1, summary.getFileCount());
+ Assert.assertEquals(
+ blockSize * REPL_FACTOR + blockSize * 2 * REPL_FACTOR * srcNum,
+ summary.getSpaceConsumed());
+ }
+
+ @Test
+ public void testConcatWithQuotaIncrease() throws IOException {
+ final short repl = 3;
+ final int srcNum = 10;
+ final Path foo = new Path("/foo");
+ final Path bar = new Path(foo, "bar");
+ final Path[] srcs = new Path[srcNum];
+ final Path target = new Path(bar, "target");
+ DFSTestUtil.createFile(dfs, target, blockSize, repl, 0L);
+
+ final long dsQuota = blockSize * repl + blockSize * srcNum * REPL_FACTOR;
+ dfs.setQuota(foo, Long.MAX_VALUE - 1, dsQuota);
+
+ for (int i = 0; i < srcNum; i++) {
+ srcs[i] = new Path(bar, "src" + i);
+ DFSTestUtil.createFile(dfs, srcs[i], blockSize, REPL_FACTOR, 0L);
+ }
+
+ ContentSummary summary = dfs.getContentSummary(bar);
+ Assert.assertEquals(11, summary.getFileCount());
+ Assert.assertEquals(dsQuota, summary.getSpaceConsumed());
+
+ try {
+ dfs.concat(target, srcs);
+ fail("QuotaExceededException expected");
+ } catch (RemoteException e) {
+ Assert.assertTrue(
+ e.unwrapRemoteException() instanceof QuotaExceededException);
+ }
+
+ dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
+ dfs.concat(target, srcs);
+ summary = dfs.getContentSummary(bar);
+ Assert.assertEquals(1, summary.getFileCount());
+ Assert.assertEquals(blockSize * repl * (srcNum + 1),
+ summary.getSpaceConsumed());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
index 3084f26..2e6b4a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
@@ -232,14 +232,18 @@ public class TestNamenodeRetryCache {
// Retried append requests succeed
newCall();
- LastBlockWithStatus b = nnRpc.append(src, "holder");
- Assert.assertEquals(b, nnRpc.append(src, "holder"));
- Assert.assertEquals(b, nnRpc.append(src, "holder"));
+ LastBlockWithStatus b = nnRpc.append(src, "holder",
+ new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
+ Assert.assertEquals(b, nnRpc.append(src, "holder",
+ new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND))));
+ Assert.assertEquals(b, nnRpc.append(src, "holder",
+ new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND))));
// non-retried call fails
newCall();
try {
- nnRpc.append(src, "holder");
+ nnRpc.append(src, "holder",
+ new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
Assert.fail("testAppend - expected exception is not thrown");
} catch (Exception e) {
// Expected
@@ -409,7 +413,7 @@ public class TestNamenodeRetryCache {
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
(LightWeightCache<CacheEntry, CacheEntry>) namesystem.getRetryCache().getCacheSet();
- assertEquals(24, cacheSet.size());
+ assertEquals(25, cacheSet.size());
Map<CacheEntry, CacheEntry> oldEntries =
new HashMap<CacheEntry, CacheEntry>();
@@ -428,7 +432,7 @@ public class TestNamenodeRetryCache {
assertTrue(namesystem.hasRetryCache());
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) namesystem
.getRetryCache().getCacheSet();
- assertEquals(24, cacheSet.size());
+ assertEquals(25, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
index 066fd66..916893c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
@@ -163,7 +163,7 @@ public class TestRetryCacheWithHA {
FSNamesystem fsn0 = cluster.getNamesystem(0);
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
(LightWeightCache<CacheEntry, CacheEntry>) fsn0.getRetryCache().getCacheSet();
- assertEquals(24, cacheSet.size());
+ assertEquals(25, cacheSet.size());
Map<CacheEntry, CacheEntry> oldEntries =
new HashMap<CacheEntry, CacheEntry>();
@@ -184,7 +184,7 @@ public class TestRetryCacheWithHA {
FSNamesystem fsn1 = cluster.getNamesystem(1);
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) fsn1
.getRetryCache().getCacheSet();
- assertEquals(24, cacheSet.size());
+ assertEquals(25, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
@@ -438,7 +438,8 @@ public class TestRetryCacheWithHA {
@Override
void invoke() throws Exception {
- lbk = client.getNamenode().append(fileName, client.getClientName());
+ lbk = client.getNamenode().append(fileName, client.getClientName(),
+ new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
}
// check if the inode of the file is under construction
@@ -701,7 +702,8 @@ public class TestRetryCacheWithHA {
final Path filePath = new Path(file);
DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0);
// append to the file and leave the last block under construction
- out = this.client.append(file, BlockSize, null, null);
+ out = this.client.append(file, BlockSize, EnumSet.of(CreateFlag.APPEND),
+ null, null);
byte[] appendContent = new byte[100];
new Random().nextBytes(appendContent);
out.write(appendContent);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32548f4b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
index dce3f47..da8c190 100644
Binary files a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored and b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored differ