You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/01/27 21:58:24 UTC
[3/3] hadoop git commit: HDFS-3689. Add support for variable length
block. Contributed by Jing Zhao.
HDFS-3689. Add support for variable length block. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2848db81
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2848db81
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2848db81
Branch: refs/heads/trunk
Commit: 2848db814a98b83e7546f65a2751e56fb5b2dbe0
Parents: 1e2d98a
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Jan 27 12:58:10 2015 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Jan 27 12:58:10 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/hadoop/fs/CreateFlag.java | 24 +-
.../org/apache/hadoop/fs/FSOutputSummer.java | 2 +-
.../hadoop/hdfs/nfs/nfs3/WriteManager.java | 5 +-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../java/org/apache/hadoop/hdfs/DFSClient.java | 48 +-
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 69 +--
.../hadoop/hdfs/DistributedFileSystem.java | 10 +-
.../hdfs/client/HdfsDataOutputStream.java | 8 +-
.../org/apache/hadoop/hdfs/inotify/Event.java | 12 +
.../hadoop/hdfs/protocol/ClientProtocol.java | 9 +-
...tNamenodeProtocolServerSideTranslatorPB.java | 14 +-
.../ClientNamenodeProtocolTranslatorPB.java | 17 +-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 25 +-
.../datanode/web/webhdfs/WebHdfsHandler.java | 3 +-
.../hdfs/server/namenode/FSDirConcatOp.java | 259 +++++------
.../hdfs/server/namenode/FSDirectory.java | 4 +-
.../hadoop/hdfs/server/namenode/FSEditLog.java | 20 +-
.../hdfs/server/namenode/FSEditLogLoader.java | 64 ++-
.../hdfs/server/namenode/FSEditLogOp.java | 101 ++++-
.../hdfs/server/namenode/FSEditLogOpCodes.java | 1 +
.../hdfs/server/namenode/FSNamesystem.java | 56 ++-
.../hadoop/hdfs/server/namenode/INodeFile.java | 2 +-
.../namenode/InotifyFSEditLogOpTranslator.java | 4 +
.../server/namenode/NameNodeLayoutVersion.java | 3 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 9 +-
.../src/main/proto/ClientNamenodeProtocol.proto | 2 +
.../hadoop-hdfs/src/main/proto/inotify.proto | 1 +
.../org/apache/hadoop/hdfs/AppendTestUtil.java | 16 +
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 3 +
.../hdfs/TestDFSInotifyEventInputStream.java | 9 +-
.../org/apache/hadoop/hdfs/TestFileAppend.java | 162 +++++++
.../org/apache/hadoop/hdfs/TestFileAppend2.java | 193 +++++++-
.../org/apache/hadoop/hdfs/TestFileAppend3.java | 212 +++++++--
.../hadoop/hdfs/TestFileAppendRestart.java | 10 +-
.../java/org/apache/hadoop/hdfs/TestHFlush.java | 128 +++++-
.../apache/hadoop/hdfs/TestLeaseRecovery.java | 6 +-
.../fsdataset/impl/TestLazyPersistFiles.java | 5 +-
.../hdfs/server/namenode/TestHDFSConcat.java | 78 +++-
.../server/namenode/TestNamenodeRetryCache.java | 16 +-
.../namenode/ha/TestRetryCacheWithHA.java | 10 +-
.../hadoop-hdfs/src/test/resources/editsStored | Bin 5586 -> 5803 bytes
.../src/test/resources/editsStored.xml | 437 ++++++++++---------
42 files changed, 1509 insertions(+), 550 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
index c5d23b4..e008ecc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
@@ -47,6 +47,10 @@ import org.apache.hadoop.classification.InterfaceStability;
* <li> SYNC_BLOCK - to force closed blocks to the disk device.
* In addition {@link Syncable#hsync()} should be called after each write,
* if true synchronous behavior is required.</li>
+ * <li> LAZY_PERSIST - Create the block on transient storage (RAM) if
+ * available.</li>
+ * <li> APPEND_NEWBLOCK - Append data to a new block instead of end of the last
+ * partial block.</li>
* </ol>
*
* Following combination is not valid and will result in
@@ -93,7 +97,13 @@ public enum CreateFlag {
* This flag must only be used for intermediate data whose loss can be
* tolerated by the application.
*/
- LAZY_PERSIST((short) 0x10);
+ LAZY_PERSIST((short) 0x10),
+
+ /**
+ * Append data to a new block instead of the end of the last partial block.
+ * This is only useful for APPEND.
+ */
+ NEW_BLOCK((short) 0x20);
private final short mode;
@@ -149,4 +159,16 @@ public enum CreateFlag {
+ ". Create option is not specified in " + flag);
}
}
+
+ /**
+ * Validate the CreateFlag for the append operation. The flag must contain
+ * APPEND, and cannot contain OVERWRITE.
+ */
+ public static void validateForAppend(EnumSet<CreateFlag> flag) {
+ validate(flag);
+ if (!flag.contains(APPEND)) {
+ throw new HadoopIllegalArgumentException(flag
+ + " does not contain APPEND");
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
index 934421a..13a5e26 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
@@ -165,7 +165,7 @@ abstract public class FSOutputSummer extends OutputStream {
count = partialLen;
System.arraycopy(buf, bufLen - count, buf, 0, count);
} else {
- count = 0;
+ count = 0;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
index df02e04..52c75ed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
@@ -18,10 +18,12 @@
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.IOException;
+import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
@@ -147,7 +149,8 @@ public class WriteManager {
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
- fos = dfsClient.append(fileIdPath, bufferSize, null, null);
+ fos = dfsClient.append(fileIdPath, bufferSize,
+ EnumSet.of(CreateFlag.APPEND), null, null);
latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
} catch (RemoteException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 1e1af97..b867a70 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -18,6 +18,8 @@ Trunk (Unreleased)
HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
+ HDFS-3689. Add support for variable length block. (jing9)
+
IMPROVEMENTS
HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 21f75a5..8512156 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -1656,9 +1656,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @param checksumOpt checksum options
*
* @return output stream
- *
- * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable,
- * boolean, short, long) for detailed description of exceptions thrown
+ *
+ * @see ClientProtocol#create for detailed description of exceptions thrown
*/
public DFSOutputStream create(String src,
FsPermission permission,
@@ -1732,7 +1731,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
return null;
}
- return callAppend(src, buffersize, progress);
+ return callAppend(src, buffersize, flag, progress);
}
return null;
}
@@ -1810,11 +1809,16 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
/** Method to get stream returned by append call */
- private DFSOutputStream callAppend(String src,
- int buffersize, Progressable progress) throws IOException {
- LastBlockWithStatus lastBlockWithStatus = null;
- try {
- lastBlockWithStatus = namenode.append(src, clientName);
+ private DFSOutputStream callAppend(String src, int buffersize,
+ EnumSet<CreateFlag> flag, Progressable progress) throws IOException {
+ CreateFlag.validateForAppend(flag);
+ try {
+ LastBlockWithStatus blkWithStatus = namenode.append(src, clientName,
+ new EnumSetWritable<>(flag, CreateFlag.class));
+ return DFSOutputStream.newStreamForAppend(this, src,
+ flag.contains(CreateFlag.NEW_BLOCK),
+ buffersize, progress, blkWithStatus.getLastBlock(),
+ blkWithStatus.getFileStatus(), dfsClientConf.createChecksum());
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
@@ -1824,10 +1828,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
}
- HdfsFileStatus newStat = lastBlockWithStatus.getFileStatus();
- return DFSOutputStream.newStreamForAppend(this, src, buffersize, progress,
- lastBlockWithStatus.getLastBlock(), newStat,
- dfsClientConf.createChecksum());
}
/**
@@ -1835,23 +1835,25 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*
* @param src file name
* @param buffersize buffer size
+ * @param flag indicates whether to append data to a new block instead of
+ * the last block
* @param progress for reporting write-progress; null is acceptable.
* @param statistics file system statistics; null is acceptable.
* @return an output stream for writing into the file
*
- * @see ClientProtocol#append(String, String)
+ * @see ClientProtocol#append(String, String, EnumSetWritable)
*/
public HdfsDataOutputStream append(final String src, final int buffersize,
- final Progressable progress, final FileSystem.Statistics statistics
- ) throws IOException {
- final DFSOutputStream out = append(src, buffersize, progress);
+ EnumSet<CreateFlag> flag, final Progressable progress,
+ final FileSystem.Statistics statistics) throws IOException {
+ final DFSOutputStream out = append(src, buffersize, flag, progress);
return createWrappedOutputStream(out, statistics, out.getInitialLen());
}
- private DFSOutputStream append(String src, int buffersize, Progressable progress)
- throws IOException {
+ private DFSOutputStream append(String src, int buffersize,
+ EnumSet<CreateFlag> flag, Progressable progress) throws IOException {
checkOpen();
- final DFSOutputStream result = callAppend(src, buffersize, progress);
+ final DFSOutputStream result = callAppend(src, buffersize, flag, progress);
beginFileLease(result.getFileId(), result);
return result;
}
@@ -1938,7 +1940,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
/**
* Move blocks from src to trg and delete src
- * See {@link ClientProtocol#concat(String, String [])}.
+ * See {@link ClientProtocol#concat}.
*/
public void concat(String trg, String [] srcs) throws IOException {
checkOpen();
@@ -1980,7 +1982,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
/**
* Truncate a file to an indicated size
- * See {@link ClientProtocol#truncate(String, long)}.
+ * See {@link ClientProtocol#truncate}.
*/
public boolean truncate(String src, long newLength) throws IOException {
checkOpen();
@@ -3005,7 +3007,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
/**
* Get {@link ContentSummary} rooted at the specified directory.
- * @param path The string representation of the path
+ * @param src The string representation of the path
*
* @see ClientProtocol#getContentSummary(String)
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 67d3143..8cebda1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -426,15 +426,16 @@ public class DFSOutputStream extends FSOutputSummer
/**
* construction with tracing info
*/
- private DataStreamer(HdfsFileStatus stat, Span span) {
+ private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, Span span) {
isAppend = false;
isLazyPersistFile = isLazyPersist(stat);
+ this.block = block;
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
traceSpan = span;
}
/**
- * Construct a data streamer for append
+ * Construct a data streamer for appending to the last partial block
* @param lastBlock last block of the file to be appended
* @param stat status of the file to be appended
* @param bytesPerChecksum number of bytes per checksum
@@ -1716,7 +1717,7 @@ public class DFSOutputStream extends FSOutputSummer
if (Trace.isTracing()) {
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
}
- streamer = new DataStreamer(stat, traceSpan);
+ streamer = new DataStreamer(stat, null, traceSpan);
if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes);
}
@@ -1773,7 +1774,7 @@ public class DFSOutputStream extends FSOutputSummer
}
/** Construct a new output stream for append. */
- private DFSOutputStream(DFSClient dfsClient, String src,
+ private DFSOutputStream(DFSClient dfsClient, String src, boolean toNewBlock,
Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat,
DataChecksum checksum) throws IOException {
this(dfsClient, src, progress, stat, checksum);
@@ -1785,21 +1786,24 @@ public class DFSOutputStream extends FSOutputSummer
}
// The last partial block of the file has to be filled.
- if (lastBlock != null) {
+ if (!toNewBlock && lastBlock != null) {
// indicate that we are appending to an existing block
bytesCurBlock = lastBlock.getBlockSize();
streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum, traceSpan);
} else {
- computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
- streamer = new DataStreamer(stat, traceSpan);
+ computePacketChunkSize(dfsClient.getConf().writePacketSize,
+ bytesPerChecksum);
+ streamer = new DataStreamer(stat,
+ lastBlock != null ? lastBlock.getBlock() : null, traceSpan);
}
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
}
static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
- int buffersize, Progressable progress, LocatedBlock lastBlock,
- HdfsFileStatus stat, DataChecksum checksum) throws IOException {
- final DFSOutputStream out = new DFSOutputStream(dfsClient, src,
+ boolean toNewBlock, int bufferSize, Progressable progress,
+ LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum)
+ throws IOException {
+ final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock,
progress, lastBlock, stat, checksum);
out.start();
return out;
@@ -1995,35 +1999,37 @@ public class DFSOutputStream extends FSOutputSummer
long toWaitFor;
long lastBlockLength = -1L;
boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH);
+ boolean endBlock = syncFlags.contains(SyncFlag.END_BLOCK);
synchronized (this) {
- // flush checksum buffer, but keep checksum buffer intact
- int numKept = flushBuffer(true, true);
+ // flush checksum buffer, but keep checksum buffer intact if we do not
+ // need to end the current block
+ int numKept = flushBuffer(!endBlock, true);
// bytesCurBlock potentially incremented if there was buffered data
if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug(
- "DFSClient flush() :" +
- " bytesCurBlock " + bytesCurBlock +
- " lastFlushOffset " + lastFlushOffset);
+ DFSClient.LOG.debug("DFSClient flush():"
+ + " bytesCurBlock=" + bytesCurBlock
+ + " lastFlushOffset=" + lastFlushOffset
+ + " createNewBlock=" + endBlock);
}
// Flush only if we haven't already flushed till this offset.
if (lastFlushOffset != bytesCurBlock) {
assert bytesCurBlock > lastFlushOffset;
// record the valid offset of this flush
lastFlushOffset = bytesCurBlock;
- if (isSync && currentPacket == null) {
+ if (isSync && currentPacket == null && !endBlock) {
// Nothing to send right now,
// but sync was requested.
- // Send an empty packet
+ // Send an empty packet if we do not end the block right now
currentPacket = createPacket(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++);
}
} else {
- if (isSync && bytesCurBlock > 0) {
+ if (isSync && bytesCurBlock > 0 && !endBlock) {
// Nothing to send right now,
// and the block was partially written,
// and sync was requested.
- // So send an empty sync packet.
+ // So send an empty sync packet if we do not end the block right now
currentPacket = createPacket(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++);
} else if (currentPacket != null) {
@@ -2036,10 +2042,21 @@ public class DFSOutputStream extends FSOutputSummer
currentPacket.syncBlock = isSync;
waitAndQueueCurrentPacket();
}
- // Restore state of stream. Record the last flush offset
- // of the last full chunk that was flushed.
- //
- bytesCurBlock -= numKept;
+ if (endBlock && bytesCurBlock > 0) {
+ // Need to end the current block, thus send an empty packet to
+ // indicate this is the end of the block and reset bytesCurBlock
+ currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
+ currentPacket.lastPacketInBlock = true;
+ currentPacket.syncBlock = shouldSyncBlock || isSync;
+ waitAndQueueCurrentPacket();
+ bytesCurBlock = 0;
+ lastFlushOffset = 0;
+ } else {
+ // Restore state of stream. Record the last flush offset
+ // of the last full chunk that was flushed.
+ bytesCurBlock -= numKept;
+ }
+
toWaitFor = lastQueuedSeqno;
} // end synchronized
@@ -2058,8 +2075,8 @@ public class DFSOutputStream extends FSOutputSummer
// namenode.
if (persistBlocks.getAndSet(false) || updateLength) {
try {
- dfsClient.namenode.fsync(src, fileId,
- dfsClient.clientName, lastBlockLength);
+ dfsClient.namenode.fsync(src, fileId, dfsClient.clientName,
+ lastBlockLength);
} catch (IOException ioe) {
DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe);
// If we got an error here, it might be because some other thread called
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 654e2f9..710ab18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -314,13 +314,19 @@ public class DistributedFileSystem extends FileSystem {
@Override
public FSDataOutputStream append(Path f, final int bufferSize,
final Progressable progress) throws IOException {
+ return append(f, EnumSet.of(CreateFlag.APPEND), bufferSize, progress);
+ }
+
+ public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
+ final int bufferSize, final Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataOutputStream>() {
@Override
public FSDataOutputStream doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.append(getPathName(p), bufferSize, progress, statistics);
+ throws IOException {
+ return dfs.append(getPathName(p), bufferSize, flag, progress,
+ statistics);
}
@Override
public FSDataOutputStream next(final FileSystem fs, final Path p)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java
index 2149678..745ca7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java
@@ -101,6 +101,12 @@ public class HdfsDataOutputStream extends FSDataOutputStream {
* When doing sync to DataNodes, also update the metadata (block length) in
* the NameNode.
*/
- UPDATE_LENGTH;
+ UPDATE_LENGTH,
+
+ /**
+ * Sync the data to DataNode, close the current block, and allocate a new
+ * block
+ */
+ END_BLOCK;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java
index 5ceff1b..a6de289 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java
@@ -463,15 +463,22 @@ public abstract class Event {
*/
public static class AppendEvent extends Event {
private String path;
+ private boolean newBlock;
public static class Builder {
private String path;
+ private boolean newBlock;
public Builder path(String path) {
this.path = path;
return this;
}
+ public Builder newBlock(boolean newBlock) {
+ this.newBlock = newBlock;
+ return this;
+ }
+
public AppendEvent build() {
return new AppendEvent(this);
}
@@ -480,11 +487,16 @@ public abstract class Event {
private AppendEvent(Builder b) {
super(EventType.APPEND);
this.path = b.path;
+ this.newBlock = b.newBlock;
}
public String getPath() {
return path;
}
+
+ public boolean toNewBlock() {
+ return newBlock;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index cfd1c67..cba1982 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -203,6 +203,7 @@ public interface ClientProtocol {
* Append to the end of the file.
* @param src path of the file being created.
* @param clientName name of the current client.
+ * @param flag indicates whether the data is appended to a new block.
* @return wrapper with information about the last partial block and file
* status if any
* @throws AccessControlException if permission to append file is
@@ -225,10 +226,10 @@ public interface ClientProtocol {
* @throws UnsupportedOperationException if append is not supported
*/
@AtMostOnce
- public LastBlockWithStatus append(String src, String clientName)
- throws AccessControlException, DSQuotaExceededException,
- FileNotFoundException, SafeModeException, UnresolvedLinkException,
- SnapshotAccessControlException, IOException;
+ public LastBlockWithStatus append(String src, String clientName,
+ EnumSetWritable<CreateFlag> flag) throws AccessControlException,
+ DSQuotaExceededException, FileNotFoundException, SafeModeException,
+ UnresolvedLinkException, SnapshotAccessControlException, IOException;
/**
* Set replication for an existing file.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index 8bcc1eb..dbb8b85 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -18,12 +18,14 @@
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
@@ -65,6 +67,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowS
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
@@ -187,8 +191,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Update
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathResponseProto;
@@ -209,6 +211,7 @@ import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrResponseProto;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
+import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
@@ -412,8 +415,11 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public AppendResponseProto append(RpcController controller,
AppendRequestProto req) throws ServiceException {
try {
+ EnumSetWritable<CreateFlag> flags = req.hasFlag() ?
+ PBHelper.convertCreateFlag(req.getFlag()) :
+ new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND));
LastBlockWithStatus result = server.append(req.getSrc(),
- req.getClientName());
+ req.getClientName(), flags);
AppendResponseProto.Builder builder = AppendResponseProto.newBuilder();
if (result.getLastBlock() != null) {
builder.setBlock(PBHelper.convert(result.getLastBlock()));
@@ -522,7 +528,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
throw new ServiceException(e);
}
}
-
+
@Override
public CompleteResponseProto complete(RpcController controller,
CompleteRequestProto req) throws ServiceException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index f3826af..1d6c0ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Lists;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.CacheFlag;
@@ -85,6 +84,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowS
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto;
@@ -158,13 +158,11 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTim
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto;
@@ -318,13 +316,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
- public LastBlockWithStatus append(String src, String clientName)
- throws AccessControlException, DSQuotaExceededException,
- FileNotFoundException, SafeModeException, UnresolvedLinkException,
- IOException {
- AppendRequestProto req = AppendRequestProto.newBuilder()
- .setSrc(src)
- .setClientName(clientName)
+ public LastBlockWithStatus append(String src, String clientName,
+ EnumSetWritable<CreateFlag> flag) throws AccessControlException,
+ DSQuotaExceededException, FileNotFoundException, SafeModeException,
+ UnresolvedLinkException, IOException {
+ AppendRequestProto req = AppendRequestProto.newBuilder().setSrc(src)
+ .setClientName(clientName).setFlag(PBHelper.convertCreateFlag(flag))
.build();
try {
AppendResponseProto res = rpcProxy.append(null, req);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 7187838..e4746cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -1373,6 +1373,9 @@ public class PBHelper {
if (flag.contains(CreateFlag.LAZY_PERSIST)) {
value |= CreateFlagProto.LAZY_PERSIST.getNumber();
}
+ if (flag.contains(CreateFlag.NEW_BLOCK)) {
+ value |= CreateFlagProto.NEW_BLOCK.getNumber();
+ }
return value;
}
@@ -1393,7 +1396,11 @@ public class PBHelper {
== CreateFlagProto.LAZY_PERSIST_VALUE) {
result.add(CreateFlag.LAZY_PERSIST);
}
- return new EnumSetWritable<CreateFlag>(result);
+ if ((flag & CreateFlagProto.NEW_BLOCK_VALUE)
+ == CreateFlagProto.NEW_BLOCK_VALUE) {
+ result.add(CreateFlag.NEW_BLOCK);
+ }
+ return new EnumSetWritable<CreateFlag>(result, CreateFlag.class);
}
public static int convertCacheFlags(EnumSet<CacheFlag> flags) {
@@ -2605,11 +2612,11 @@ public class PBHelper {
.build());
break;
case EVENT_APPEND:
- InotifyProtos.AppendEventProto reopen =
+ InotifyProtos.AppendEventProto append =
InotifyProtos.AppendEventProto.parseFrom(p.getContents());
- events.add(new Event.AppendEvent.Builder()
- .path(reopen.getPath())
- .build());
+ events.add(new Event.AppendEvent.Builder().path(append.getPath())
+ .newBlock(append.hasNewBlock() && append.getNewBlock())
+ .build());
break;
case EVENT_UNLINK:
InotifyProtos.UnlinkEventProto unlink =
@@ -2710,10 +2717,10 @@ public class PBHelper {
Event.AppendEvent re2 = (Event.AppendEvent) e;
events.add(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_APPEND)
- .setContents(
- InotifyProtos.AppendEventProto.newBuilder()
- .setPath(re2.getPath()).build().toByteString()
- ).build());
+ .setContents(InotifyProtos.AppendEventProto.newBuilder()
+ .setPath(re2.getPath())
+ .setNewBlock(re2.toNewBlock()).build().toByteString())
+ .build());
break;
case UNLINK:
Event.UnlinkEvent ue = (Event.UnlinkEvent) e;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java
index f02780a..be1faec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java
@@ -176,7 +176,8 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
final int bufferSize = params.bufferSize();
DFSClient dfsClient = newDfsClient(nnId, conf);
- OutputStream out = dfsClient.append(path, bufferSize, null, null);
+ OutputStream out = dfsClient.append(path, bufferSize,
+ EnumSet.of(CreateFlag.APPEND), null, null);
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, OK);
resp.headers().set(CONTENT_LENGTH, 0);
ctx.pipeline().replace(this, HdfsWriter.class.getSimpleName(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
index 43d3b20..ecfd2e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
@@ -19,11 +19,10 @@ package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Preconditions;
import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import java.io.IOException;
import java.util.Arrays;
@@ -33,201 +32,171 @@ import java.util.Set;
import static org.apache.hadoop.util.Time.now;
class FSDirConcatOp {
- static HdfsFileStatus concat(
- FSDirectory fsd, String target, String[] srcs,
+
+ static HdfsFileStatus concat(FSDirectory fsd, String target, String[] srcs,
boolean logRetryCache) throws IOException {
Preconditions.checkArgument(!target.isEmpty(), "Target file name is empty");
Preconditions.checkArgument(srcs != null && srcs.length > 0,
"No sources given");
assert srcs != null;
-
- FSDirectory.LOG.debug("concat {} to {}", Arrays.toString(srcs), target);
- // We require all files be in the same directory
- String trgParent =
- target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
- for (String s : srcs) {
- String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
- if (!srcParent.equals(trgParent)) {
- throw new IllegalArgumentException(
- "Sources and target are not in the same directory");
- }
+ if (FSDirectory.LOG.isDebugEnabled()) {
+ FSDirectory.LOG.debug("concat {} to {}", Arrays.toString(srcs), target);
}
- final INodesInPath trgIip = fsd.getINodesInPath4Write(target);
+ final INodesInPath targetIIP = fsd.getINodesInPath4Write(target);
// write permission for the target
+ FSPermissionChecker pc = null;
if (fsd.isPermissionEnabled()) {
- FSPermissionChecker pc = fsd.getPermissionChecker();
- fsd.checkPathAccess(pc, trgIip, FsAction.WRITE);
-
- // and srcs
- for(String aSrc: srcs) {
- final INodesInPath srcIip = fsd.getINodesInPath4Write(aSrc);
- fsd.checkPathAccess(pc, srcIip, FsAction.READ); // read the file
- fsd.checkParentAccess(pc, srcIip, FsAction.WRITE); // for delete
- }
+ pc = fsd.getPermissionChecker();
+ fsd.checkPathAccess(pc, targetIIP, FsAction.WRITE);
}
- // to make sure no two files are the same
- Set<INode> si = new HashSet<INode>();
+ // check the target
+ verifyTargetFile(fsd, target, targetIIP);
+ // check the srcs
+ INodeFile[] srcFiles = verifySrcFiles(fsd, srcs, targetIIP, pc);
- // we put the following prerequisite for the operation
- // replication and blocks sizes should be the same for ALL the blocks
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
+ Arrays.toString(srcs) + " to " + target);
+ }
+
+ long timestamp = now();
+ fsd.writeLock();
+ try {
+ unprotectedConcat(fsd, targetIIP, srcFiles, timestamp);
+ } finally {
+ fsd.writeUnlock();
+ }
+ fsd.getEditLog().logConcat(target, srcs, timestamp, logRetryCache);
+ return fsd.getAuditFileInfo(targetIIP);
+ }
+ private static void verifyTargetFile(FSDirectory fsd, final String target,
+ final INodesInPath targetIIP) throws IOException {
// check the target
- if (fsd.getEZForPath(trgIip) != null) {
+ if (fsd.getEZForPath(targetIIP) != null) {
throw new HadoopIllegalArgumentException(
"concat can not be called for files in an encryption zone.");
}
- final INodeFile trgInode = INodeFile.valueOf(trgIip.getLastINode(), target);
- if(trgInode.isUnderConstruction()) {
+ final INodeFile targetINode = INodeFile.valueOf(targetIIP.getLastINode(),
+ target);
+ if(targetINode.isUnderConstruction()) {
throw new HadoopIllegalArgumentException("concat: target file "
+ target + " is under construction");
}
- // per design target shouldn't be empty and all the blocks same size
- if(trgInode.numBlocks() == 0) {
- throw new HadoopIllegalArgumentException("concat: target file "
- + target + " is empty");
- }
- if (trgInode.isWithSnapshot()) {
- throw new HadoopIllegalArgumentException("concat: target file "
- + target + " is in a snapshot");
- }
-
- long blockSize = trgInode.getPreferredBlockSize();
-
- // check the end block to be full
- final BlockInfo last = trgInode.getLastBlock();
- if(blockSize != last.getNumBytes()) {
- throw new HadoopIllegalArgumentException("The last block in " + target
- + " is not full; last block size = " + last.getNumBytes()
- + " but file block size = " + blockSize);
- }
-
- si.add(trgInode);
- final short repl = trgInode.getFileReplication();
+ }
+ private static INodeFile[] verifySrcFiles(FSDirectory fsd, String[] srcs,
+ INodesInPath targetIIP, FSPermissionChecker pc) throws IOException {
+ // to make sure no two files are the same
+ Set<INodeFile> si = new HashSet<>();
+ final INodeFile targetINode = targetIIP.getLastINode().asFile();
+ final INodeDirectory targetParent = targetINode.getParent();
// now check the srcs
- boolean endSrc = false; // final src file doesn't have to have full end block
- for(int i=0; i< srcs.length; i++) {
- String src = srcs[i];
- if(i== srcs.length-1)
- endSrc=true;
-
- final INodeFile srcInode = INodeFile.valueOf(fsd.getINode4Write(src), src);
- if(src.isEmpty()
- || srcInode.isUnderConstruction()
- || srcInode.numBlocks() == 0) {
- throw new HadoopIllegalArgumentException("concat: source file " + src
- + " is invalid or empty or underConstruction");
+ for(String src : srcs) {
+ final INodesInPath iip = fsd.getINodesInPath4Write(src);
+ // permission check for srcs
+ if (pc != null) {
+ fsd.checkPathAccess(pc, iip, FsAction.READ); // read the file
+ fsd.checkParentAccess(pc, iip, FsAction.WRITE); // for delete
}
-
- // check replication and blocks size
- if(repl != srcInode.getBlockReplication()) {
- throw new HadoopIllegalArgumentException("concat: the source file "
- + src + " and the target file " + target
- + " should have the same replication: source replication is "
- + srcInode.getBlockReplication()
- + " but target replication is " + repl);
+ final INode srcINode = iip.getLastINode();
+ final INodeFile srcINodeFile = INodeFile.valueOf(srcINode, src);
+ // make sure the src file and the target file are in the same dir
+ if (srcINodeFile.getParent() != targetParent) {
+ throw new HadoopIllegalArgumentException("Source file " + src
+ + " is not in the same directory with the target "
+ + targetIIP.getPath());
}
-
- //boolean endBlock=false;
- // verify that all the blocks are of the same length as target
- // should be enough to check the end blocks
- final BlockInfo[] srcBlocks = srcInode.getBlocks();
- int idx = srcBlocks.length-1;
- if(endSrc)
- idx = srcBlocks.length-2; // end block of endSrc is OK not to be full
- if(idx >= 0 && srcBlocks[idx].getNumBytes() != blockSize) {
- throw new HadoopIllegalArgumentException("concat: the source file "
- + src + " and the target file " + target
- + " should have the same blocks sizes: target block size is "
- + blockSize + " but the size of source block " + idx + " is "
- + srcBlocks[idx].getNumBytes());
+ // make sure all the source files are not in snapshot
+ if (srcINode.isInLatestSnapshot(iip.getLatestSnapshotId())) {
+ throw new SnapshotException("Concat: the source file " + src
+ + " is in snapshot");
}
-
- si.add(srcInode);
+ // check if the file has other references.
+ if (srcINode.isReference() && ((INodeReference.WithCount)
+ srcINode.asReference().getReferredINode()).getReferenceCount() > 1) {
+ throw new SnapshotException("Concat: the source file " + src
+ + " is referred by some other reference in some snapshot.");
+ }
+ if (srcINode == targetINode) {
+ throw new HadoopIllegalArgumentException("concat: the src file " + src
+ + " is the same with the target file " + targetIIP.getPath());
+ }
+ if(srcINodeFile.isUnderConstruction() || srcINodeFile.numBlocks() == 0) {
+ throw new HadoopIllegalArgumentException("concat: source file " + src
+ + " is invalid or empty or underConstruction");
+ }
+ si.add(srcINodeFile);
}
// make sure no two files are the same
- if(si.size() < srcs.length+1) { // trg + srcs
+ if(si.size() < srcs.length) {
// it means at least two files are the same
throw new HadoopIllegalArgumentException(
"concat: at least two of the source files are the same");
}
+ return si.toArray(new INodeFile[si.size()]);
+ }
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
- Arrays.toString(srcs) + " to " + target);
+ private static long computeQuotaDelta(INodeFile target, INodeFile[] srcList) {
+ long delta = 0;
+ short targetRepl = target.getBlockReplication();
+ for (INodeFile src : srcList) {
+ if (targetRepl != src.getBlockReplication()) {
+ delta += src.computeFileSize() *
+ (targetRepl - src.getBlockReplication());
+ }
}
+ return delta;
+ }
- long timestamp = now();
- fsd.writeLock();
- try {
- unprotectedConcat(fsd, target, srcs, timestamp);
- } finally {
- fsd.writeUnlock();
+ private static void verifyQuota(FSDirectory fsd, INodesInPath targetIIP,
+ long delta) throws QuotaExceededException {
+ if (!fsd.getFSNamesystem().isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
+ // Do not check quota if editlog is still being processed
+ return;
}
- fsd.getEditLog().logConcat(target, srcs, timestamp, logRetryCache);
- return fsd.getAuditFileInfo(trgIip);
+ FSDirectory.verifyQuota(targetIIP, targetIIP.length() - 1, 0, delta, null);
}
/**
* Concat all the blocks from srcs to trg and delete the srcs files
* @param fsd FSDirectory
- * @param target target file to move the blocks to
- * @param srcs list of file to move the blocks from
*/
- static void unprotectedConcat(
- FSDirectory fsd, String target, String[] srcs, long timestamp)
- throws IOException {
+ static void unprotectedConcat(FSDirectory fsd, INodesInPath targetIIP,
+ INodeFile[] srcList, long timestamp) throws IOException {
assert fsd.hasWriteLock();
if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
+ NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "
+ + targetIIP.getPath());
}
- // do the move
-
- final INodesInPath trgIIP = fsd.getINodesInPath4Write(target, true);
- final INodeFile trgInode = trgIIP.getLastINode().asFile();
- INodeDirectory trgParent = trgIIP.getINode(-2).asDirectory();
- final int trgLatestSnapshot = trgIIP.getLatestSnapshotId();
-
- final INodeFile [] allSrcInodes = new INodeFile[srcs.length];
- for(int i = 0; i < srcs.length; i++) {
- final INodesInPath iip = fsd.getINodesInPath4Write(srcs[i]);
- final int latest = iip.getLatestSnapshotId();
- final INode inode = iip.getLastINode();
-
- // check if the file in the latest snapshot
- if (inode.isInLatestSnapshot(latest)) {
- throw new SnapshotException("Concat: the source file " + srcs[i]
- + " is in snapshot " + latest);
- }
- // check if the file has other references.
- if (inode.isReference() && ((INodeReference.WithCount)
- inode.asReference().getReferredINode()).getReferenceCount() > 1) {
- throw new SnapshotException("Concat: the source file " + srcs[i]
- + " is referred by some other reference in some snapshot.");
- }
+ final INodeFile trgInode = targetIIP.getLastINode().asFile();
+ long delta = computeQuotaDelta(trgInode, srcList);
+ verifyQuota(fsd, targetIIP, delta);
- allSrcInodes[i] = inode.asFile();
- }
- trgInode.concatBlocks(allSrcInodes);
+ // the target file can be included in a snapshot
+ trgInode.recordModification(targetIIP.getLatestSnapshotId());
+ INodeDirectory trgParent = targetIIP.getINode(-2).asDirectory();
+ trgInode.concatBlocks(srcList);
// since we are in the same dir - we can use same parent to remove files
int count = 0;
- for(INodeFile nodeToRemove: allSrcInodes) {
- if(nodeToRemove == null) continue;
-
- nodeToRemove.setBlocks(null);
- trgParent.removeChild(nodeToRemove, trgLatestSnapshot);
- fsd.getINodeMap().remove(nodeToRemove);
- count++;
+ for (INodeFile nodeToRemove : srcList) {
+ if(nodeToRemove != null) {
+ nodeToRemove.setBlocks(null);
+ nodeToRemove.getParent().removeChild(nodeToRemove);
+ fsd.getINodeMap().remove(nodeToRemove);
+ count++;
+ }
}
- trgInode.setModificationTime(timestamp, trgLatestSnapshot);
- trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
+ trgInode.setModificationTime(timestamp, targetIIP.getLatestSnapshotId());
+ trgParent.updateModificationTime(timestamp, targetIIP.getLatestSnapshotId());
// update quota on the parent directory ('count' files removed, 0 space)
- FSDirectory.unprotectedUpdateCount(trgIIP, trgIIP.length() - 1, -count, 0);
+ FSDirectory.unprotectedUpdateCount(targetIIP, targetIIP.length() - 1,
+ -count, delta);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index c171448..c012847 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -452,7 +452,7 @@ public class FSDirectory implements Closeable {
Preconditions.checkState(fileINode.isUnderConstruction());
// check quota limits and updated space consumed
- updateCount(inodesInPath, 0, fileINode.getBlockDiskspace(), true);
+ updateCount(inodesInPath, 0, fileINode.getPreferredBlockDiskspace(), true);
// associate new last block for the file
BlockInfoUnderConstruction blockInfo =
@@ -508,7 +508,7 @@ public class FSDirectory implements Closeable {
}
// update space consumed
- updateCount(iip, 0, -fileNode.getBlockDiskspace(), true);
+ updateCount(iip, 0, -fileNode.getPreferredBlockDiskspace(), true);
return true;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 144be37..3c7eae4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -34,10 +34,10 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -52,9 +52,11 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AppendOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CloseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ConcatDeleteOp;
@@ -76,6 +78,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetAclOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
@@ -90,7 +93,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -702,7 +704,19 @@ public class FSEditLog implements LogsPurgeable {
op.setRpcCallId(Server.getCallId());
}
}
-
+
+ public void logAppendFile(String path, INodeFile file, boolean newBlock,
+ boolean toLogRpcIds) {
+ FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
+ assert uc != null;
+ AppendOp op = AppendOp.getInstance(cache.get()).setPath(path)
+ .setClientName(uc.getClientName())
+ .setClientMachine(uc.getClientMachine())
+ .setNewBlock(newBlock);
+ logRpcIds(op, toLogRpcIds);
+ logEdit(op);
+ }
+
/**
* Add open lease record to edit log.
* Records the block locations of the last block.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 322e18c..7cb6486 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -41,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AppendOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ClearNSQuotaOp;
@@ -68,6 +69,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveXAttrOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
@@ -83,7 +85,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveXAttrOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
@@ -325,22 +326,22 @@ public class FSEditLogLoader {
LOG.trace("replaying edit log: " + op);
}
final boolean toAddRetryCache = fsNamesys.hasRetryCache() && op.hasRpcIds();
-
+
switch (op.opCode) {
case OP_ADD: {
AddCloseOp addCloseOp = (AddCloseOp)op;
final String path =
renameReservedPathsOnUpgrade(addCloseOp.path, logVersion);
- if (LOG.isDebugEnabled()) {
- LOG.debug(op.opCode + ": " + path +
+ if (FSNamesystem.LOG.isDebugEnabled()) {
+ FSNamesystem.LOG.debug(op.opCode + ": " + path +
" numblocks : " + addCloseOp.blocks.length +
" clientHolder " + addCloseOp.clientName +
" clientMachine " + addCloseOp.clientMachine);
}
- // There three cases here:
+ // There are 3 cases here:
// 1. OP_ADD to create a new file
// 2. OP_ADD to update file blocks
- // 3. OP_ADD to open file for append
+ // 3. OP_ADD to open file for append (old append)
// See if the file already exists (persistBlocks call)
INodesInPath iip = fsDir.getINodesInPath(path, true);
@@ -383,19 +384,17 @@ public class FSEditLogLoader {
fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
addCloseOp.rpcCallId, stat);
}
- } else { // This is OP_ADD on an existing file
+ } else { // This is OP_ADD on an existing file (old append)
if (!oldFile.isUnderConstruction()) {
// This is case 3: a call to append() on an already-closed file.
if (FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("Reopening an already-closed file " +
"for append");
}
- // Note we do not replace the INodeFile when converting it to
- // under-construction
- LocatedBlock lb = fsNamesys.prepareFileForWrite(path, iip,
- addCloseOp.clientName, addCloseOp.clientMachine, false, false);
-
- // add the op into retry cache is necessary
+ LocatedBlock lb = fsNamesys.prepareFileForAppend(path, iip,
+ addCloseOp.clientName, addCloseOp.clientMachine, false, false,
+ false);
+ // add the op into retry cache if necessary
if (toAddRetryCache) {
HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
fsNamesys.dir,
@@ -453,6 +452,34 @@ public class FSEditLogLoader {
}
break;
}
+ case OP_APPEND: {
+ AppendOp appendOp = (AppendOp) op;
+ final String path = renameReservedPathsOnUpgrade(appendOp.path,
+ logVersion);
+ if (FSNamesystem.LOG.isDebugEnabled()) {
+ FSNamesystem.LOG.debug(op.opCode + ": " + path +
+ " clientName " + appendOp.clientName +
+ " clientMachine " + appendOp.clientMachine +
+ " newBlock " + appendOp.newBlock);
+ }
+ INodesInPath iip = fsDir.getINodesInPath4Write(path);
+ INodeFile file = INodeFile.valueOf(iip.getLastINode(), path);
+ if (!file.isUnderConstruction()) {
+ LocatedBlock lb = fsNamesys.prepareFileForAppend(path, iip,
+ appendOp.clientName, appendOp.clientMachine, appendOp.newBlock,
+ false, false);
+ // add the op into retry cache if necessary
+ if (toAddRetryCache) {
+ HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
+ fsNamesys.dir, HdfsFileStatus.EMPTY_NAME, file,
+ BlockStoragePolicySuite.ID_UNSPECIFIED,
+ Snapshot.CURRENT_STATE_ID, false, iip);
+ fsNamesys.addCacheEntryWithPayload(appendOp.rpcClientId,
+ appendOp.rpcCallId, new LastBlockWithStatus(lb, stat));
+ }
+ }
+ break;
+ }
case OP_UPDATE_BLOCKS: {
UpdateBlocksOp updateOp = (UpdateBlocksOp)op;
final String path =
@@ -499,7 +526,14 @@ public class FSEditLogLoader {
srcs[i] =
renameReservedPathsOnUpgrade(concatDeleteOp.srcs[i], logVersion);
}
- FSDirConcatOp.unprotectedConcat(fsDir, trg, srcs, concatDeleteOp.timestamp);
+ INodesInPath targetIIP = fsDir.getINodesInPath4Write(trg);
+ INodeFile[] srcFiles = new INodeFile[srcs.length];
+ for (int i = 0; i < srcs.length; i++) {
+ INodesInPath srcIIP = fsDir.getINodesInPath4Write(srcs[i]);
+ srcFiles[i] = srcIIP.getLastINode().asFile();
+ }
+ FSDirConcatOp.unprotectedConcat(fsDir, targetIIP, srcFiles,
+ concatDeleteOp.timestamp);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(concatDeleteOp.rpcClientId,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index 9424156..1629d80 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_APPEND;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_BLOCK;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_POOL;
@@ -207,6 +208,7 @@ public abstract class FSEditLogOp {
inst.put(OP_SET_XATTR, new SetXAttrOp());
inst.put(OP_REMOVE_XATTR, new RemoveXAttrOp());
inst.put(OP_SET_STORAGE_POLICY, new SetStoragePolicyOp());
+ inst.put(OP_APPEND, new AppendOp());
}
public FSEditLogOp get(FSEditLogOpCodes opcode) {
@@ -428,7 +430,7 @@ public abstract class FSEditLogOp {
private AddCloseOp(FSEditLogOpCodes opCode) {
super(opCode);
storagePolicyId = BlockStoragePolicySuite.ID_UNSPECIFIED;
- assert(opCode == OP_ADD || opCode == OP_CLOSE);
+ assert(opCode == OP_ADD || opCode == OP_CLOSE || opCode == OP_APPEND);
}
@Override
@@ -770,7 +772,7 @@ public abstract class FSEditLogOp {
}
static AddOp getInstance(OpInstanceCache cache) {
- return (AddOp)cache.get(OP_ADD);
+ return (AddOp) cache.get(OP_ADD);
}
@Override
@@ -788,7 +790,7 @@ public abstract class FSEditLogOp {
}
/**
- * Although {@link ClientProtocol#appendFile} may also log a close op, we do
+ * Although {@link ClientProtocol#append} may also log a close op, we do
* not need to record the rpc ids here since a successful appendFile op will
* finally log an AddOp.
*/
@@ -814,6 +816,97 @@ public abstract class FSEditLogOp {
return builder.toString();
}
}
+
+ static class AppendOp extends FSEditLogOp {
+ String path;
+ String clientName;
+ String clientMachine;
+ boolean newBlock;
+
+ private AppendOp() {
+ super(OP_APPEND);
+ }
+
+ static AppendOp getInstance(OpInstanceCache cache) {
+ return (AppendOp) cache.get(OP_APPEND);
+ }
+
+ AppendOp setPath(String path) {
+ this.path = path;
+ return this;
+ }
+
+ AppendOp setClientName(String clientName) {
+ this.clientName = clientName;
+ return this;
+ }
+
+ AppendOp setClientMachine(String clientMachine) {
+ this.clientMachine = clientMachine;
+ return this;
+ }
+
+ AppendOp setNewBlock(boolean newBlock) {
+ this.newBlock = newBlock;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("AppendOp ");
+ builder.append("[path=").append(path);
+ builder.append(", clientName=").append(clientName);
+ builder.append(", clientMachine=").append(clientMachine);
+ builder.append(", newBlock=").append(newBlock).append("]");
+ return builder.toString();
+ }
+
+ @Override
+ void resetSubFields() {
+ this.path = null;
+ this.clientName = null;
+ this.clientMachine = null;
+ this.newBlock = false;
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion) throws IOException {
+ this.path = FSImageSerialization.readString(in);
+ this.clientName = FSImageSerialization.readString(in);
+ this.clientMachine = FSImageSerialization.readString(in);
+ this.newBlock = FSImageSerialization.readBoolean(in);
+ readRpcIds(in, logVersion);
+ }
+
+ @Override
+ public void writeFields(DataOutputStream out) throws IOException {
+ FSImageSerialization.writeString(path, out);
+ FSImageSerialization.writeString(clientName, out);
+ FSImageSerialization.writeString(clientMachine, out);
+ FSImageSerialization.writeBoolean(newBlock, out);
+ writeRpcIds(rpcClientId, rpcCallId, out);
+ }
+
+ @Override
+ protected void toXml(ContentHandler contentHandler) throws SAXException {
+ XMLUtils.addSaxString(contentHandler, "PATH", path);
+ XMLUtils.addSaxString(contentHandler, "CLIENT_NAME", clientName);
+ XMLUtils.addSaxString(contentHandler, "CLIENT_MACHINE", clientMachine);
+ XMLUtils.addSaxString(contentHandler, "NEWBLOCK",
+ Boolean.toString(newBlock));
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+ }
+
+ @Override
+ void fromXml(Stanza st) throws InvalidXmlException {
+ this.path = st.getValue("PATH");
+ this.clientName = st.getValue("CLIENT_NAME");
+ this.clientMachine = st.getValue("CLIENT_MACHINE");
+ this.newBlock = Boolean.parseBoolean(st.getValue("NEWBLOCK"));
+ readRpcIdsFromXml(st);
+ }
+ }
static class AddBlockOp extends FSEditLogOp {
private String path;
@@ -1643,7 +1736,7 @@ public abstract class FSEditLogOp {
* {@link ClientProtocol#updateBlockForPipeline},
* {@link ClientProtocol#recoverLease}, {@link ClientProtocol#addBlock}) or
* already bound with other editlog op which records rpc ids (
- * {@link ClientProtocol#startFile}). Thus no need to record rpc ids here.
+ * {@link ClientProtocol#create}). Thus no need to record rpc ids here.
*/
static class SetGenstampV1Op extends FSEditLogOp {
long genStampV1;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
index 468e048..6cd1617 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
@@ -74,6 +74,7 @@ public enum FSEditLogOpCodes {
OP_REMOVE_XATTR ((byte) 44),
OP_SET_STORAGE_POLICY ((byte) 45),
OP_TRUNCATE ((byte) 46),
+ OP_APPEND ((byte) 47),
// Note that the current range of the valid OP code is 0~127
OP_INVALID ((byte) -1);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index fae1641..ebdec1b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -250,6 +250,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RetriableException;
@@ -2586,12 +2587,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* <p>
*
* For description of parameters and exceptions thrown see
- * {@link ClientProtocol#append(String, String)}
- *
+ * {@link ClientProtocol#append(String, String, EnumSetWritable)}
+ *
* @return the last block locations if the block is partial or null otherwise
*/
private LocatedBlock appendFileInternal(FSPermissionChecker pc,
- INodesInPath iip, String holder, String clientMachine,
+ INodesInPath iip, String holder, String clientMachine, boolean newBlock,
boolean logRetryCache) throws IOException {
assert hasWriteLock();
// Verify that the destination does not exist as a directory already.
@@ -2613,7 +2614,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
INodeFile myFile = INodeFile.valueOf(inode, src, true);
final BlockStoragePolicy lpPolicy =
blockManager.getStoragePolicy("LAZY_PERSIST");
-
if (lpPolicy != null &&
lpPolicy.getId() == myFile.getStoragePolicyID()) {
throw new UnsupportedOperationException(
@@ -2629,8 +2629,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throw new IOException("append: lastBlock=" + lastBlock +
" of src=" + src + " is not sufficiently replicated yet.");
}
- return prepareFileForWrite(src, iip, holder, clientMachine, true,
- logRetryCache);
+ return prepareFileForAppend(src, iip, holder, clientMachine, newBlock,
+ true, logRetryCache);
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
throw ie;
@@ -2644,6 +2644,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @param src path to the file
* @param leaseHolder identifier of the lease holder on this file
* @param clientMachine identifier of the client machine
+ * @param newBlock if the data is appended to a new block
* @param writeToEditLog whether to persist this change to the edit log
* @param logRetryCache whether to record RPC ids in editlog for retry cache
* rebuilding
@@ -2651,26 +2652,34 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @throws UnresolvedLinkException
* @throws IOException
*/
- LocatedBlock prepareFileForWrite(String src, INodesInPath iip,
- String leaseHolder, String clientMachine, boolean writeToEditLog,
- boolean logRetryCache) throws IOException {
+ LocatedBlock prepareFileForAppend(String src, INodesInPath iip,
+ String leaseHolder, String clientMachine, boolean newBlock,
+ boolean writeToEditLog, boolean logRetryCache) throws IOException {
final INodeFile file = iip.getLastINode().asFile();
file.recordModification(iip.getLatestSnapshotId());
file.toUnderConstruction(leaseHolder, clientMachine);
leaseManager.addLease(
file.getFileUnderConstructionFeature().getClientName(), src);
-
- LocatedBlock ret =
- blockManager.convertLastBlockToUnderConstruction(file, 0);
- if (ret != null) {
- // update the quota: use the preferred block size for UC block
- final long diff = file.getPreferredBlockSize() - ret.getBlockSize();
- dir.updateSpaceConsumed(iip, 0, diff * file.getBlockReplication());
+
+ LocatedBlock ret = null;
+ if (!newBlock) {
+ ret = blockManager.convertLastBlockToUnderConstruction(file, 0);
+ if (ret != null) {
+ // update the quota: use the preferred block size for UC block
+ final long diff = file.getPreferredBlockSize() - ret.getBlockSize();
+ dir.updateSpaceConsumed(iip, 0, diff * file.getBlockReplication());
+ }
+ } else {
+ BlockInfo lastBlock = file.getLastBlock();
+ if (lastBlock != null) {
+ ExtendedBlock blk = new ExtendedBlock(this.getBlockPoolId(), lastBlock);
+ ret = new LocatedBlock(blk, new DatanodeInfo[0]);
+ }
}
if (writeToEditLog) {
- getEditLog().logOpenFile(src, file, false, logRetryCache);
+ getEditLog().logAppendFile(src, file, newBlock, logRetryCache);
}
return ret;
}
@@ -2805,11 +2814,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/**
* Append to an existing file in the namespace.
*/
- LastBlockWithStatus appendFile(
- String src, String holder, String clientMachine, boolean logRetryCache)
+ LastBlockWithStatus appendFile(String src, String holder,
+ String clientMachine, EnumSet<CreateFlag> flag, boolean logRetryCache)
throws IOException {
try {
- return appendFileInt(src, holder, clientMachine, logRetryCache);
+ return appendFileInt(src, holder, clientMachine,
+ flag.contains(CreateFlag.NEW_BLOCK), logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "append", src);
throw e;
@@ -2817,7 +2827,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
private LastBlockWithStatus appendFileInt(final String srcArg, String holder,
- String clientMachine, boolean logRetryCache) throws IOException {
+ String clientMachine, boolean newBlock, boolean logRetryCache)
+ throws IOException {
String src = srcArg;
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: src=" + src
@@ -2836,7 +2847,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
checkNameNodeSafeMode("Cannot append to file" + src);
src = dir.resolvePath(pc, src, pathComponents);
final INodesInPath iip = dir.getINodesInPath4Write(src);
- lb = appendFileInternal(pc, iip, holder, clientMachine, logRetryCache);
+ lb = appendFileInternal(pc, iip, holder, clientMachine, newBlock,
+ logRetryCache);
stat = FSDirStatAndListingOp.getFileInfo(dir, src, false,
FSDirectory.isReservedRawName(srcArg), true);
} catch (StandbyException se) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index e871bdc..cbcdac9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -412,7 +412,7 @@ public class INodeFile extends INodeWithAdditionalFields
}
/** @return the diskspace required for a full block. */
- final long getBlockDiskspace() {
+ final long getPreferredBlockDiskspace() {
return getPreferredBlockSize() * getBlockReplication();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
index f265340..5345b46 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
@@ -65,6 +65,10 @@ public class InotifyFSEditLogOpTranslator {
FSEditLogOp.CloseOp cOp = (FSEditLogOp.CloseOp) op;
return new EventBatch(op.txid, new Event[] {
new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) });
+ case OP_APPEND:
+ FSEditLogOp.AppendOp appendOp = (FSEditLogOp.AppendOp) op;
+ return new EventBatch(op.txid, new Event[] {new Event.AppendEvent
+ .Builder().path(appendOp.path).newBlock(appendOp.newBlock).build()});
case OP_SET_REPLICATION:
FSEditLogOp.SetReplicationOp setRepOp = (FSEditLogOp.SetReplicationOp) op;
return new EventBatch(op.txid,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
index d742c6d..848fa33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
@@ -70,7 +70,8 @@ public class NameNodeLayoutVersion {
"creating file with overwrite"),
XATTRS_NAMESPACE_EXT(-59, "Increase number of xattr namespaces"),
BLOCK_STORAGE_POLICY(-60, "Block Storage policy"),
- TRUNCATE(-61, "Truncate");
+ TRUNCATE(-61, "Truncate"),
+ APPEND_NEW_BLOCK(-62, "Support appending to new block");
private final FeatureInfo info;