You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/03/06 00:04:25 UTC
[18/27] hadoop git commit: HDFS-7746. Add a test randomly mixing
append, truncate and snapshot operations.
HDFS-7746. Add a test randomly mixing append, truncate and snapshot operations.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ded0200e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ded0200e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ded0200e
Branch: refs/heads/YARN-2928
Commit: ded0200e9c98dea960db756bb208ff475d710e28
Parents: 22426a1
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Thu Mar 5 10:21:29 2015 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Thu Mar 5 10:21:29 2015 +0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hadoop/hdfs/TestAppendSnapshotTruncate.java | 478 +++++++++++++++++++
2 files changed, 481 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ded0200e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index d9008d9..f9541e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -709,6 +709,9 @@ Release 2.7.0 - UNRELEASED
HDFS-1522. Combine two BLOCK_FILE_PREFIX constants into one.
(Dongming Liang via shv)
+ HDFS-7746. Add a test randomly mixing append, truncate and snapshot
+ operations. (szetszwo)
+
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ded0200e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendSnapshotTruncate.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendSnapshotTruncate.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendSnapshotTruncate.java
new file mode 100644
index 0000000..5c4c7b4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendSnapshotTruncate.java
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Test randomly mixing append, snapshot and truncate operations.
+ * Use local file system to simulate the each operation and verify
+ * the correctness.
+ */
+public class TestAppendSnapshotTruncate {
+ static {
+ GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL);
+ }
+ private static final Log LOG = LogFactory.getLog(TestAppendSnapshotTruncate.class);
+ private static final int BLOCK_SIZE = 1024;
+ private static final int DATANODE_NUM = 3;
+ private static final short REPLICATION = 3;
+
+ static final int SHORT_HEARTBEAT = 1;
+ static final String[] EMPTY_STRINGS = {};
+
+ static Configuration conf;
+ static MiniDFSCluster cluster;
+ static DistributedFileSystem dfs;
+
+ @BeforeClass
+ public static void startUp() throws IOException {
+ conf = new HdfsConfiguration();
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, BLOCK_SIZE);
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, SHORT_HEARTBEAT);
+ conf.setLong(
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
+ cluster = new MiniDFSCluster.Builder(conf)
+ .format(true)
+ .numDataNodes(DATANODE_NUM)
+ .nameNodePort(NameNode.DEFAULT_PORT)
+ .waitSafeMode(true)
+ .build();
+ dfs = cluster.getFileSystem();
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if(dfs != null) {
+ dfs.close();
+ }
+ if(cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+
+ /** Test randomly mixing append, snapshot and truncate operations. */
+ @Test
+ public void testAST() throws Exception {
+ final String dirPathString = "/dir";
+ final Path dir = new Path(dirPathString);
+ dfs.mkdirs(dir);
+ dfs.allowSnapshot(dir);
+
+ final File localDir = new File(
+ System.getProperty("test.build.data", "target/test/data")
+ + dirPathString);
+ if (localDir.exists()) {
+ FileUtil.fullyDelete(localDir);
+ }
+ localDir.mkdirs();
+
+ final DirWorker w = new DirWorker(dir, localDir, 3);
+ w.startAllFiles();
+ w.start();
+ Worker.sleep(10L*1000);
+ w.stop();
+ w.stoptAllFiles();
+ w.checkEverything();
+ }
+
+ static final FileFilter FILE_ONLY = new FileFilter() {
+ @Override
+ public boolean accept(File f) {
+ return f.isFile();
+ }
+ };
+
+ static class DirWorker extends Worker {
+ final Path dir;
+ final File localDir;
+
+ final FileWorker[] files;
+
+ private Map<String, Path> snapshotPaths = new HashMap<String, Path>();
+ private AtomicInteger snapshotCount = new AtomicInteger();
+
+ DirWorker(Path dir, File localDir, int nFiles) throws IOException {
+ super(dir.getName());
+ this.dir = dir;
+ this.localDir = localDir;
+
+ this.files = new FileWorker[nFiles];
+ for(int i = 0; i < files.length; i++) {
+ files[i] = new FileWorker(dir, localDir, String.format("file%02d", i));
+ }
+ }
+
+ static String getSnapshotName(int n) {
+ return String.format("s%02d", n);
+ }
+
+ String createSnapshot(String snapshot) throws IOException {
+ final StringBuilder b = new StringBuilder("createSnapshot: ")
+ .append(snapshot).append(" for ").append(dir);
+
+ {
+ //copy all local files to a sub dir to simulate snapshot.
+ final File subDir = new File(localDir, snapshot);
+ Assert.assertFalse(subDir.exists());
+ subDir.mkdir();
+
+ for(File f : localDir.listFiles(FILE_ONLY)) {
+ FileUtils.copyFile(f, new File(subDir, f.getName()));
+ }
+ }
+
+ final Path p = dfs.createSnapshot(dir, snapshot);
+ snapshotPaths.put(snapshot, p);
+ return b.toString();
+ }
+
+ String checkSnapshot(String snapshot) throws IOException {
+ final StringBuilder b = new StringBuilder("checkSnapshot: ")
+ .append(snapshot);
+
+ final File subDir = new File(localDir, snapshot);
+ Assert.assertTrue(subDir.exists());
+
+ final File[] localFiles = subDir.listFiles(FILE_ONLY);
+ final Path p = snapshotPaths.get(snapshot);
+ final FileStatus[] statuses = dfs.listStatus(p);
+ Assert.assertEquals(localFiles.length, statuses.length);
+ b.append(p).append(" vs ").append(subDir).append(", ")
+ .append(statuses.length).append(" entries");
+
+ Arrays.sort(localFiles);
+ Arrays.sort(statuses);
+ for(int i = 0; i < statuses.length; i++) {
+ FileWorker.checkFullFile(statuses[i].getPath(), localFiles[i]);
+ }
+ return b.toString();
+ }
+
+ String deleteSnapshot(String snapshot) throws IOException {
+ final StringBuilder b = new StringBuilder("deleteSnapshot: ")
+ .append(snapshot).append(" from ").append(dir);
+ FileUtil.fullyDelete(new File(localDir, snapshot));
+ dfs.deleteSnapshot(dir, snapshot);
+ snapshotPaths.remove(snapshot);
+ return b.toString();
+ }
+
+
+ @Override
+ public String call() throws Exception {
+ final Random r = DFSUtil.getRandom();
+ final int op = r.nextInt(6);
+ if (op <= 1) {
+ pauseAllFiles();
+ try {
+ final String snapshot = getSnapshotName(snapshotCount.getAndIncrement());
+ return createSnapshot(snapshot);
+ } finally {
+ startAllFiles();
+ }
+ } else if (op <= 3) {
+ final String[] keys = snapshotPaths.keySet().toArray(EMPTY_STRINGS);
+ if (keys.length == 0) {
+ return "NO-OP";
+ }
+ final String snapshot = keys[r.nextInt(keys.length)];
+ final String s = checkSnapshot(snapshot);
+
+ if (op == 2) {
+ return deleteSnapshot(snapshot);
+ }
+ return s;
+ } else {
+ return "NO-OP";
+ }
+ }
+
+ void pauseAllFiles() {
+ for(FileWorker f : files) {
+ f.pause();
+ }
+
+ for(int i = 0; i < files.length; ) {
+ sleep(100);
+ for(; i < files.length && files[i].isPaused(); i++);
+ }
+ }
+
+ void startAllFiles() {
+ for(FileWorker f : files) {
+ f.start();
+ }
+ }
+
+ void stoptAllFiles() throws InterruptedException {
+ for(FileWorker f : files) {
+ f.stop();
+ }
+ }
+
+ void checkEverything() throws IOException {
+ LOG.info("checkEverything");
+ for(FileWorker f : files) {
+ f.checkFullFile();
+ Preconditions.checkState(f.state.get() != State.ERROR);
+ }
+ for(String snapshot : snapshotPaths.keySet()) {
+ checkSnapshot(snapshot);
+ }
+ Preconditions.checkState(state.get() != State.ERROR);
+ }
+ }
+
+ static class FileWorker extends Worker {
+ final Path file;
+ final File localFile;
+
+ FileWorker(Path dir, File localDir, String filename) throws IOException {
+ super(filename);
+ this.file = new Path(dir, filename);
+ this.localFile = new File(localDir, filename);
+
+ localFile.createNewFile();
+ dfs.create(file, false, 4096, REPLICATION, BLOCK_SIZE).close();
+ }
+
+ @Override
+ public String call() throws IOException {
+ final Random r = DFSUtil.getRandom();
+ final int op = r.nextInt(9);
+ if (op == 0) {
+ return checkFullFile();
+ } else {
+ final int nBlocks = r.nextInt(4) + 1;
+ final int lastBlockSize = r.nextInt(BLOCK_SIZE) + 1;
+ final int nBytes = nBlocks*BLOCK_SIZE + lastBlockSize;
+
+ if (op <= 4) {
+ return append(nBytes);
+ } else if (op <= 6) {
+ return truncateArbitrarily(nBytes);
+ } else {
+ return truncateToBlockBoundary(nBlocks);
+ }
+ }
+ }
+
+ String append(int n) throws IOException {
+ final StringBuilder b = new StringBuilder("append ")
+ .append(n).append(" bytes to ").append(file.getName());
+
+ final byte[] bytes = new byte[n];
+ DFSUtil.getRandom().nextBytes(bytes);
+
+ { // write to local file
+ final FileOutputStream out = new FileOutputStream(localFile, true);
+ out.write(bytes, 0, bytes.length);
+ out.close();
+ }
+
+ {
+ final FSDataOutputStream out = dfs.append(file);
+ out.write(bytes, 0, bytes.length);
+ out.close();
+ }
+ return b.toString();
+ }
+
+ String truncateArbitrarily(int nBytes) throws IOException {
+ Preconditions.checkArgument(nBytes > 0);
+ final int length = checkLength();
+ final StringBuilder b = new StringBuilder("truncateArbitrarily: ")
+ .append(nBytes).append(" bytes from ").append(file.getName())
+ .append(", length=" + length);
+
+ truncate(length > nBytes? length - nBytes: 0, b);
+ return b.toString();
+ }
+
+ String truncateToBlockBoundary(int nBlocks) throws IOException {
+ Preconditions.checkArgument(nBlocks > 0);
+ final int length = checkLength();
+ final StringBuilder b = new StringBuilder("truncateToBlockBoundary: ")
+ .append(nBlocks).append(" blocks from ").append(file.getName())
+ .append(", length=" + length);
+ final int n = (nBlocks - 1)*BLOCK_SIZE + (length%BLOCK_SIZE);
+ Preconditions.checkState(truncate(length > n? length - n: 0, b), b);
+ return b.toString();
+ }
+
+ private boolean truncate(long newLength, StringBuilder b) throws IOException {
+ final RandomAccessFile raf = new RandomAccessFile(localFile, "rw");
+ raf.setLength(newLength);
+ raf.close();
+
+ final boolean isReady = dfs.truncate(file, newLength);
+ b.append(", newLength=").append(newLength)
+ .append(", isReady=").append(isReady);
+ if (!isReady) {
+ TestFileTruncate.checkBlockRecovery(file, dfs);
+ }
+ return isReady;
+ }
+
+ int checkLength() throws IOException {
+ return checkLength(file, localFile);
+ }
+
+ static int checkLength(Path file, File localFile) throws IOException {
+ final long length = dfs.getFileStatus(file).getLen();
+ Assert.assertEquals(localFile.length(), length);
+ Assert.assertTrue(length <= Integer.MAX_VALUE);
+ return (int)length;
+ }
+
+ String checkFullFile() throws IOException {
+ return checkFullFile(file, localFile);
+ }
+
+ static String checkFullFile(Path file, File localFile) throws IOException {
+ final StringBuilder b = new StringBuilder("checkFullFile: ")
+ .append(file.getName()).append(" vs ").append(localFile);
+ final byte[] bytes = new byte[checkLength(file, localFile)];
+ b.append(", length=").append(bytes.length);
+
+ final FileInputStream in = new FileInputStream(localFile);
+ for(int n = 0; n < bytes.length; ) {
+ n += in.read(bytes, n, bytes.length - n);
+ }
+ in.close();
+
+ AppendTestUtil.checkFullFile(dfs, file, bytes.length, bytes,
+ "File content mismatch: " + b, false);
+ return b.toString();
+ }
+ }
+
+ static abstract class Worker implements Callable<String> {
+ enum State {
+ IDLE(false), RUNNING(false), STOPPED(true), ERROR(true);
+
+ final boolean isTerminated;
+ State(boolean isTerminated) {
+ this.isTerminated = isTerminated;
+ }
+ };
+
+ final String name;
+ final AtomicReference<State> state = new AtomicReference<State>(State.IDLE);
+ final AtomicBoolean isCalling = new AtomicBoolean();
+ final AtomicReference<Thread> thread = new AtomicReference<Thread>();
+
+ Worker(String name) {
+ this.name = name;
+ }
+
+ void start() {
+ Preconditions.checkState(state.compareAndSet(State.IDLE, State.RUNNING));
+
+ if (thread.get() == null) {
+ final Thread t = new Thread(null, new Runnable() {
+ @Override
+ public void run() {
+ final Random r = DFSUtil.getRandom();
+ for(State s; (s = state.get()) == State.RUNNING || s == State.IDLE;) {
+ if (s == State.RUNNING) {
+ isCalling.set(true);
+ try {
+ LOG.info(call());
+ } catch (Exception e) {
+ LOG.error("Worker " + name + " failed.", e);
+ state.set(State.ERROR);
+ return;
+ }
+ isCalling.set(false);
+ }
+ sleep(r.nextInt(100) + 50);
+ }
+ }
+ }, name);
+ Preconditions.checkState(thread.compareAndSet(null, t));
+ t.start();
+ }
+ }
+
+ boolean isPaused() {
+ return state.get() == State.IDLE && !isCalling.get();
+ }
+
+ void pause() {
+ Preconditions.checkState(state.compareAndSet(State.RUNNING, State.IDLE));
+ }
+
+ void stop() throws InterruptedException {
+ if (state.get() == State.ERROR) {
+ return;
+ }
+
+ state.set(State.STOPPED);
+ thread.get().join();
+ }
+
+ static void sleep(final long sleepTimeMs) {
+ try {
+ Thread.sleep(sleepTimeMs);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}