You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/26 22:46:59 UTC
svn commit: r1151238 - in /hadoop/common/trunk/hdfs: ./
src/java/org/apache/hadoop/hdfs/server/namenode/
Author: todd
Date: Tue Jul 26 20:46:58 2011
New Revision: 1151238
URL: http://svn.apache.org/viewvc?rev=1151238&view=rev
Log:
HDFS-2149. Move EditLogOp serialization formats into FsEditLogOp implementations. Contributed by Ivan Kelly.
Modified:
hadoop/common/trunk/hdfs/CHANGES.txt
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1151238&r1=1151237&r2=1151238&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Tue Jul 26 20:46:58 2011
@@ -601,6 +601,9 @@ Trunk (unreleased changes)
HDFS-2180. Refactor NameNode HTTP server into new class. (todd)
+ HDFS-2149. Move EditLogOp serialization formats into FsEditLogOp
+ implementations. (Ivan Kelly via todd)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1151238&r1=1151237&r2=1151238&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Tue Jul 26 20:46:58 2011
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.DataOutputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -45,25 +46,18 @@ class EditLogBackupOutputStream extends
private NamenodeProtocol backupNode; // RPC proxy to backup node
private NamenodeRegistration bnRegistration; // backup node registration
private NamenodeRegistration nnRegistration; // active node registration
- private ArrayList<JournalRecord> bufCurrent; // current buffer for writing
- private ArrayList<JournalRecord> bufReady; // buffer ready for flushing
+ private ArrayList<BufferedOp> bufCurrent; // current buffer for writing
+ private ArrayList<BufferedOp> bufReady; // buffer ready for flushing
private DataOutputBuffer out; // serialized output sent to backup node
- static class JournalRecord {
- byte op;
- Writable[] args;
-
- JournalRecord(byte op, Writable ... writables) {
- this.op = op;
- this.args = writables;
- }
-
- void write(DataOutputStream out) throws IOException {
- out.write(op);
- if(args == null)
- return;
- for(Writable w : args)
- w.write(out);
+
+ private static class BufferedOp {
+ public final FSEditLogOpCodes opCode;
+ public final byte[] bytes;
+
+ public BufferedOp(FSEditLogOpCodes opCode, byte[] bytes) {
+ this.opCode = opCode;
+ this.bytes = bytes;
}
}
@@ -84,8 +78,8 @@ class EditLogBackupOutputStream extends
Storage.LOG.error("Error connecting to: " + bnAddress, e);
throw e;
}
- this.bufCurrent = new ArrayList<JournalRecord>();
- this.bufReady = new ArrayList<JournalRecord>();
+ this.bufCurrent = new ArrayList<BufferedOp>();
+ this.bufReady = new ArrayList<BufferedOp>();
this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
}
@@ -100,13 +94,18 @@ class EditLogBackupOutputStream extends
}
@Override // EditLogOutputStream
- public void write(int b) throws IOException {
- throw new IOException("Not implemented");
+ void write(FSEditLogOp op) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream s = new DataOutputStream(baos);
+ FSEditLogOp.Writer w = new FSEditLogOp.Writer(s);
+ w.writeOp(op);
+
+ bufCurrent.add(new BufferedOp(op.opCode, baos.toByteArray()));
}
- @Override // EditLogOutputStream
- void write(byte op, Writable ... writables) throws IOException {
- bufCurrent.add(new JournalRecord(op, writables));
+ @Override
+ void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+ throw new IOException("Not supported");
}
/**
@@ -134,7 +133,7 @@ class EditLogBackupOutputStream extends
@Override // EditLogOutputStream
void setReadyToFlush() throws IOException {
assert bufReady.size() == 0 : "previous data is not flushed yet";
- ArrayList<JournalRecord> tmp = bufReady;
+ ArrayList<BufferedOp> tmp = bufReady;
bufReady = bufCurrent;
bufCurrent = tmp;
}
@@ -144,12 +143,13 @@ class EditLogBackupOutputStream extends
assert out.size() == 0 : "Output buffer is not empty";
int bufReadySize = bufReady.size();
for(int idx = 0; idx < bufReadySize; idx++) {
- JournalRecord jRec = null;
+ BufferedOp jRec = null;
for(; idx < bufReadySize; idx++) {
jRec = bufReady.get(idx);
- if(jRec.op >= FSEditLogOpCodes.OP_JSPOOL_START.getOpCode())
+ if(jRec.opCode.getOpCode()
+ >= FSEditLogOpCodes.OP_JSPOOL_START.getOpCode())
break; // special operation should be sent in a separate call to BN
- jRec.write(out);
+ out.write(jRec.bytes, 0, jRec.bytes.length);
}
if(out.size() > 0)
send(NamenodeProtocol.JA_JOURNAL);
@@ -157,8 +157,8 @@ class EditLogBackupOutputStream extends
break;
// operation like start journal spool or increment checkpoint time
// is a separate call to BN
- jRec.write(out);
- send(jRec.op);
+ out.write(jRec.bytes, 0, jRec.bytes.length);
+ send(jRec.opCode.getOpCode());
}
bufReady.clear(); // erase all data in the buffer
out.reset(); // reset buffer to the start position
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1151238&r1=1151237&r2=1151238&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Tue Jul 26 20:46:58 2011
@@ -45,6 +45,7 @@ class EditLogFileOutputStream extends Ed
private FileChannel fc; // channel of the file stream for sync
private DataOutputBuffer bufCurrent; // current buffer for writing
private DataOutputBuffer bufReady; // buffer ready for flushing
+ private FSEditLogOp.Writer writer;
final private int initBufferSize; // inital buffer size
static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB
@@ -70,6 +71,7 @@ class EditLogFileOutputStream extends Ed
initBufferSize = size;
bufCurrent = new DataOutputBuffer(size);
bufReady = new DataOutputBuffer(size);
+ writer = new FSEditLogOp.Writer(bufCurrent);
RandomAccessFile rp = new RandomAccessFile(name, "rw");
fp = new FileOutputStream(rp.getFD()); // open for append
fc = rp.getChannel();
@@ -88,18 +90,11 @@ class EditLogFileOutputStream extends Ed
/** {@inheritDoc} */
@Override
- public void write(int b) throws IOException {
- bufCurrent.write(b);
- }
-
- /** {@inheritDoc} */
- @Override
- void write(byte op, Writable... writables) throws IOException {
+ void write(FSEditLogOp op) throws IOException {
int start = bufCurrent.getLength();
- write(op);
- for (Writable w : writables) {
- w.write(bufCurrent);
- }
+
+ writer.writeOp(op);
+
// write transaction checksum
int end = bufCurrent.getLength();
Checksum checksum = FSEditLog.getChecksum();
@@ -109,6 +104,12 @@ class EditLogFileOutputStream extends Ed
bufCurrent.writeInt(sum);
}
+ /** {@inheritDoc} */
+ @Override
+ void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+ bufCurrent.write(bytes, offset, length);
+ }
+
/**
* Create empty edits logs file.
*/
@@ -136,6 +137,7 @@ class EditLogFileOutputStream extends Ed
}
bufCurrent.close();
bufCurrent = null;
+ writer = null;
}
if(bufReady != null) {
@@ -156,6 +158,7 @@ class EditLogFileOutputStream extends Ed
} finally {
IOUtils.cleanup(FSNamesystem.LOG, bufCurrent, bufReady, fc, fp);
bufCurrent = bufReady = null;
+ writer = null;
fc = null;
fp = null;
}
@@ -168,10 +171,11 @@ class EditLogFileOutputStream extends Ed
@Override
void setReadyToFlush() throws IOException {
assert bufReady.size() == 0 : "previous data is not flushed yet";
- write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
+ bufCurrent.write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
DataOutputBuffer tmp = bufReady;
bufReady = bufCurrent;
bufCurrent = tmp;
+ writer = new FSEditLogOp.Writer(bufCurrent);
}
/**
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1151238&r1=1151237&r2=1151238&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Tue Jul 26 20:46:58 2011
@@ -18,17 +18,14 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
-import java.io.OutputStream;
import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.io.Writable;
/**
* A generic abstract class to support journaling of edits logs into
* a persistent storage.
*/
-abstract class EditLogOutputStream extends OutputStream
-implements JournalStream {
+abstract class EditLogOutputStream implements JournalStream {
// these are statistics counters
private long numSync; // number of sync(s) to disk
private long totalTimeSync; // total time to sync
@@ -37,19 +34,27 @@ implements JournalStream {
numSync = totalTimeSync = 0;
}
- /** {@inheritDoc} */
- abstract public void write(int b) throws IOException;
-
/**
- * Write edits log record into the stream.
- * The record is represented by operation name and
- * an array of Writable arguments.
+ * Write edits log operation to the stream.
*
* @param op operation
- * @param writables array of Writable arguments
* @throws IOException
*/
- abstract void write(byte op, Writable ... writables) throws IOException;
+ abstract void write(FSEditLogOp op) throws IOException;
+
+ /**
+ * Write raw data to an edit log. This data should already have
+ * the transaction ID, checksum, etc included. It is for use
+ * within the BackupNode when replicating edits from the
+ * NameNode.
+ *
+ * @param bytes the bytes to write.
+ * @param offset offset in the bytes to write from
+ * @param length number of bytes to write
+ * @throws IOException
+ */
+ abstract void writeRaw(byte[] bytes, int offset, int length)
+ throws IOException;
/**
* Create and initialize underlying persistent edits log storage.
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1151238&r1=1151237&r2=1151238&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Tue Jul 26 20:46:58 2011
@@ -19,10 +19,12 @@ package org.apache.hadoop.hdfs.server.na
import java.io.File;
import java.io.IOException;
+import java.io.DataOutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.zip.Checksum;
+import java.util.zip.CheckedOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,7 +32,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.Storage;
@@ -43,14 +44,13 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.PureJavaCrc32;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
/**
* FSEditLog maintains a log of the namespace modifications.
@@ -319,7 +319,7 @@ public class FSEditLog implements NNStor
* Write an operation to the edit log. Do not sync to persistent
* store yet.
*/
- void logEdit(FSEditLogOpCodes opCode, Writable ... writables) {
+ void logEdit(FSEditLogOp op) {
synchronized (this) {
// wait if an automatic sync is scheduled
waitIfAutoSyncScheduled();
@@ -329,10 +329,10 @@ public class FSEditLog implements NNStor
ArrayList<EditLogOutputStream> errorStreams = null;
long start = now();
for(EditLogOutputStream eStream : editStreams) {
- if(!eStream.isOperationSupported(opCode.getOpCode()))
+ if(!eStream.isOperationSupported(op.opCode.getOpCode()))
continue;
try {
- eStream.write(opCode.getOpCode(), writables);
+ eStream.write(op);
} catch (IOException ie) {
LOG.error("logEdit: removing "+ eStream.getName(), ie);
if(errorStreams == null)
@@ -585,49 +585,45 @@ public class FSEditLog implements NNStor
* Records the block locations of the last block.
*/
public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
-
- DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] {
- new DeprecatedUTF8(path),
- FSEditLog.toLogReplication(newNode.getReplication()),
- FSEditLog.toLogLong(newNode.getModificationTime()),
- FSEditLog.toLogLong(newNode.getAccessTime()),
- FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
- logEdit(OP_ADD,
- new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair),
- new ArrayWritable(Block.class, newNode.getBlocks()),
- newNode.getPermissionStatus(),
- new DeprecatedUTF8(newNode.getClientName()),
- new DeprecatedUTF8(newNode.getClientMachine()));
+ AddOp op = AddOp.getInstance()
+ .setPath(path)
+ .setReplication(newNode.getReplication())
+ .setModificationTime(newNode.getModificationTime())
+ .setAccessTime(newNode.getAccessTime())
+ .setBlockSize(newNode.getPreferredBlockSize())
+ .setBlocks(newNode.getBlocks())
+ .setPermissionStatus(newNode.getPermissionStatus())
+ .setClientName(newNode.getClientName())
+ .setClientMachine(newNode.getClientMachine());
+
+ logEdit(op);
}
/**
* Add close lease record to edit log.
*/
public void logCloseFile(String path, INodeFile newNode) {
- DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] {
- new DeprecatedUTF8(path),
- FSEditLog.toLogReplication(newNode.getReplication()),
- FSEditLog.toLogLong(newNode.getModificationTime()),
- FSEditLog.toLogLong(newNode.getAccessTime()),
- FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
- logEdit(OP_CLOSE,
- new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair),
- new ArrayWritable(Block.class, newNode.getBlocks()),
- newNode.getPermissionStatus());
+ CloseOp op = CloseOp.getInstance()
+ .setPath(path)
+ .setReplication(newNode.getReplication())
+ .setModificationTime(newNode.getModificationTime())
+ .setAccessTime(newNode.getAccessTime())
+ .setBlockSize(newNode.getPreferredBlockSize())
+ .setBlocks(newNode.getBlocks())
+ .setPermissionStatus(newNode.getPermissionStatus());
+
+ logEdit(op);
}
/**
* Add create directory record to edit log
*/
public void logMkDir(String path, INode newNode) {
- DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
- new DeprecatedUTF8(path),
- FSEditLog.toLogLong(newNode.getModificationTime()),
- FSEditLog.toLogLong(newNode.getAccessTime())
- };
- logEdit(OP_MKDIR,
- new ArrayWritable(DeprecatedUTF8.class, info),
- newNode.getPermissionStatus());
+ MkdirOp op = MkdirOp.getInstance()
+ .setPath(path)
+ .setTimestamp(newNode.getModificationTime())
+ .setPermissionStatus(newNode.getPermissionStatus());
+ logEdit(op);
}
/**
@@ -635,33 +631,33 @@ public class FSEditLog implements NNStor
* TODO: use String parameters until just before writing to disk
*/
void logRename(String src, String dst, long timestamp) {
- DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
- new DeprecatedUTF8(src),
- new DeprecatedUTF8(dst),
- FSEditLog.toLogLong(timestamp)};
- logEdit(OP_RENAME_OLD, new ArrayWritable(DeprecatedUTF8.class, info));
+ RenameOldOp op = RenameOldOp.getInstance()
+ .setSource(src)
+ .setDestination(dst)
+ .setTimestamp(timestamp);
+ logEdit(op);
}
/**
* Add rename record to edit log
*/
void logRename(String src, String dst, long timestamp, Options.Rename... options) {
- DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
- new DeprecatedUTF8(src),
- new DeprecatedUTF8(dst),
- FSEditLog.toLogLong(timestamp)};
- logEdit(OP_RENAME,
- new ArrayWritable(DeprecatedUTF8.class, info),
- toBytesWritable(options));
+ RenameOp op = RenameOp.getInstance()
+ .setSource(src)
+ .setDestination(dst)
+ .setTimestamp(timestamp)
+ .setOptions(options);
+ logEdit(op);
}
/**
* Add set replication record to edit log
*/
void logSetReplication(String src, short replication) {
- logEdit(OP_SET_REPLICATION,
- new DeprecatedUTF8(src),
- FSEditLog.toLogReplication(replication));
+ SetReplicationOp op = SetReplicationOp.getInstance()
+ .setPath(src)
+ .setReplication(replication);
+ logEdit(op);
}
/** Add set namespace quota record to edit log
@@ -670,64 +666,69 @@ public class FSEditLog implements NNStor
* @param quota the directory size limit
*/
void logSetQuota(String src, long nsQuota, long dsQuota) {
- logEdit(OP_SET_QUOTA,
- new DeprecatedUTF8(src),
- new LongWritable(nsQuota), new LongWritable(dsQuota));
+ SetQuotaOp op = SetQuotaOp.getInstance()
+ .setSource(src)
+ .setNSQuota(nsQuota)
+ .setDSQuota(dsQuota);
+ logEdit(op);
}
/** Add set permissions record to edit log */
void logSetPermissions(String src, FsPermission permissions) {
- logEdit(OP_SET_PERMISSIONS, new DeprecatedUTF8(src), permissions);
+ SetPermissionsOp op = SetPermissionsOp.getInstance()
+ .setSource(src)
+ .setPermissions(permissions);
+ logEdit(op);
}
/** Add set owner record to edit log */
void logSetOwner(String src, String username, String groupname) {
- DeprecatedUTF8 u = new DeprecatedUTF8(username == null? "": username);
- DeprecatedUTF8 g = new DeprecatedUTF8(groupname == null? "": groupname);
- logEdit(OP_SET_OWNER, new DeprecatedUTF8(src), u, g);
+ SetOwnerOp op = SetOwnerOp.getInstance()
+ .setSource(src)
+ .setUser(username)
+ .setGroup(groupname);
+ logEdit(op);
}
/**
* concat(trg,src..) log
*/
void logConcat(String trg, String [] srcs, long timestamp) {
- int size = 1 + srcs.length + 1; // trg, srcs, timestamp
- DeprecatedUTF8 info[] = new DeprecatedUTF8[size];
- int idx = 0;
- info[idx++] = new DeprecatedUTF8(trg);
- for(int i=0; i<srcs.length; i++) {
- info[idx++] = new DeprecatedUTF8(srcs[i]);
- }
- info[idx] = FSEditLog.toLogLong(timestamp);
- logEdit(OP_CONCAT_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
+ ConcatDeleteOp op = ConcatDeleteOp.getInstance()
+ .setTarget(trg)
+ .setSources(srcs)
+ .setTimestamp(timestamp);
+ logEdit(op);
}
/**
* Add delete file record to edit log
*/
void logDelete(String src, long timestamp) {
- DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
- new DeprecatedUTF8(src),
- FSEditLog.toLogLong(timestamp)};
- logEdit(OP_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
+ DeleteOp op = DeleteOp.getInstance()
+ .setPath(src)
+ .setTimestamp(timestamp);
+ logEdit(op);
}
/**
* Add generation stamp record to edit log
*/
void logGenerationStamp(long genstamp) {
- logEdit(OP_SET_GENSTAMP, new LongWritable(genstamp));
+ SetGenstampOp op = SetGenstampOp.getInstance()
+ .setGenerationStamp(genstamp);
+ logEdit(op);
}
/**
* Add access time record to edit log
*/
void logTimes(String src, long mtime, long atime) {
- DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
- new DeprecatedUTF8(src),
- FSEditLog.toLogLong(mtime),
- FSEditLog.toLogLong(atime)};
- logEdit(OP_TIMES, new ArrayWritable(DeprecatedUTF8.class, info));
+ TimesOp op = TimesOp.getInstance()
+ .setPath(src)
+ .setModificationTime(mtime)
+ .setAccessTime(atime);
+ logEdit(op);
}
/**
@@ -735,14 +736,13 @@ public class FSEditLog implements NNStor
*/
void logSymlink(String path, String value, long mtime,
long atime, INodeSymlink node) {
- DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
- new DeprecatedUTF8(path),
- new DeprecatedUTF8(value),
- FSEditLog.toLogLong(mtime),
- FSEditLog.toLogLong(atime)};
- logEdit(OP_SYMLINK,
- new ArrayWritable(DeprecatedUTF8.class, info),
- node.getPermissionStatus());
+ SymlinkOp op = SymlinkOp.getInstance()
+ .setPath(path)
+ .setValue(value)
+ .setModificationTime(mtime)
+ .setAccessTime(atime)
+ .setPermissionStatus(node.getPermissionStatus());
+ logEdit(op);
}
/**
@@ -753,36 +753,40 @@ public class FSEditLog implements NNStor
*/
void logGetDelegationToken(DelegationTokenIdentifier id,
long expiryTime) {
- logEdit(OP_GET_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
+ GetDelegationTokenOp op = GetDelegationTokenOp.getInstance()
+ .setDelegationTokenIdentifier(id)
+ .setExpiryTime(expiryTime);
+ logEdit(op);
}
void logRenewDelegationToken(DelegationTokenIdentifier id,
long expiryTime) {
- logEdit(OP_RENEW_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
+ RenewDelegationTokenOp op = RenewDelegationTokenOp.getInstance()
+ .setDelegationTokenIdentifier(id)
+ .setExpiryTime(expiryTime);
+ logEdit(op);
}
void logCancelDelegationToken(DelegationTokenIdentifier id) {
- logEdit(OP_CANCEL_DELEGATION_TOKEN, id);
+ CancelDelegationTokenOp op = CancelDelegationTokenOp.getInstance()
+ .setDelegationTokenIdentifier(id);
+ logEdit(op);
}
void logUpdateMasterKey(DelegationKey key) {
- logEdit(OP_UPDATE_MASTER_KEY, key);
+ UpdateMasterKeyOp op = UpdateMasterKeyOp.getInstance()
+ .setDelegationKey(key);
+ logEdit(op);
}
void logReassignLease(String leaseHolder, String src, String newHolder) {
- logEdit(OP_REASSIGN_LEASE, new DeprecatedUTF8(leaseHolder),
- new DeprecatedUTF8(src),
- new DeprecatedUTF8(newHolder));
- }
-
- static private DeprecatedUTF8 toLogReplication(short replication) {
- return new DeprecatedUTF8(Short.toString(replication));
+ ReassignLeaseOp op = ReassignLeaseOp.getInstance()
+ .setLeaseHolder(leaseHolder)
+ .setPath(src)
+ .setNewHolder(newHolder);
+ logEdit(op);
}
- static private DeprecatedUTF8 toLogLong(long timestamp) {
- return new DeprecatedUTF8(Long.toString(timestamp));
- }
-
/**
* Return the size of the current EditLog
*/
@@ -1030,7 +1034,7 @@ public class FSEditLog implements NNStor
boStream = new EditLogBackupOutputStream(bnReg, nnReg);
editStreams.add(boStream);
}
- logEdit(OP_JSPOOL_START, (Writable[])null);
+ logEdit(JSpoolStartOp.getInstance());
}
/**
@@ -1044,7 +1048,7 @@ public class FSEditLog implements NNStor
long start = now();
for(EditLogOutputStream eStream : editStreams) {
try {
- eStream.write(data, 0, length);
+ eStream.writeRaw(data, 0, length);
} catch (IOException ie) {
LOG.warn("Error in editStream " + eStream.getName(), ie);
if(errorStreams == null)
@@ -1127,8 +1131,9 @@ public class FSEditLog implements NNStor
void incrementCheckpointTime() {
storage.incrementCheckpointTime();
- Writable[] args = {new LongWritable(storage.getCheckpointTime())};
- logEdit(OP_CHECKPOINT_TIME, args);
+ CheckpointTimeOp op = CheckpointTimeOp.getInstance()
+ .setCheckpointTime(storage.getCheckpointTime());
+ logEdit(op);
}
synchronized void releaseBackupStream(NamenodeRegistration registration) {
@@ -1179,13 +1184,6 @@ public class FSEditLog implements NNStor
return regAllowed;
}
- static BytesWritable toBytesWritable(Options.Rename... options) {
- byte[] bytes = new byte[options.length];
- for (int i = 0; i < options.length; i++) {
- bytes[i] = options[i].value();
- }
- return new BytesWritable(bytes);
- }
/**
* Get the StorageDirectory for a stream
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1151238&r1=1151237&r2=1151238&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Tue Jul 26 20:46:58 2011
@@ -38,13 +38,16 @@ import static org.apache.hadoop.hdfs.ser
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.hdfs.DeprecatedUTF8;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.EOFException;
@@ -58,6 +61,43 @@ import java.io.EOFException;
public abstract class FSEditLogOp {
final FSEditLogOpCodes opCode;
+ @SuppressWarnings("deprecation")
+ private static ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>> opInstances =
+ new ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>>() {
+ @Override
+ protected EnumMap<FSEditLogOpCodes, FSEditLogOp> initialValue() {
+ EnumMap<FSEditLogOpCodes, FSEditLogOp> instances
+ = new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class);
+ instances.put(OP_ADD, new AddOp());
+ instances.put(OP_CLOSE, new CloseOp());
+ instances.put(OP_SET_REPLICATION, new SetReplicationOp());
+ instances.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
+ instances.put(OP_RENAME_OLD, new RenameOldOp());
+ instances.put(OP_DELETE, new DeleteOp());
+ instances.put(OP_MKDIR, new MkdirOp());
+ instances.put(OP_SET_GENSTAMP, new SetGenstampOp());
+ instances.put(OP_DATANODE_ADD, new DatanodeAddOp());
+ instances.put(OP_DATANODE_REMOVE, new DatanodeRemoveOp());
+ instances.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
+ instances.put(OP_SET_OWNER, new SetOwnerOp());
+ instances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
+ instances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
+ instances.put(OP_SET_QUOTA, new SetQuotaOp());
+ instances.put(OP_TIMES, new TimesOp());
+ instances.put(OP_SYMLINK, new SymlinkOp());
+ instances.put(OP_RENAME, new RenameOp());
+ instances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
+ instances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
+ instances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
+ instances.put(OP_CANCEL_DELEGATION_TOKEN,
+ new CancelDelegationTokenOp());
+ instances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
+ instances.put(OP_CHECKPOINT_TIME, new CheckpointTimeOp());
+ instances.put(OP_JSPOOL_START, new JSpoolStartOp());
+ return instances;
+ }
+ };
+
/**
* Constructor for an EditLog Op. EditLog ops cannot be constructed
* directly, but only through Reader#readOp.
@@ -66,10 +106,14 @@ public abstract class FSEditLogOp {
this.opCode = opCode;
}
- public abstract void readFields(DataInputStream in, int logVersion)
+ abstract void readFields(DataInputStream in, int logVersion)
+ throws IOException;
+
+ abstract void writeFields(DataOutputStream out)
throws IOException;
- static class AddCloseOp extends FSEditLogOp {
+ @SuppressWarnings("unchecked")
+ static abstract class AddCloseOp extends FSEditLogOp {
int length;
String path;
short replication;
@@ -87,7 +131,71 @@ public abstract class FSEditLogOp {
assert(opCode == OP_ADD || opCode == OP_CLOSE);
}
- public void readFields(DataInputStream in, int logVersion)
+ <T extends AddCloseOp> T setPath(String path) {
+ this.path = path;
+ return (T)this;
+ }
+
+ <T extends AddCloseOp> T setReplication(short replication) {
+ this.replication = replication;
+ return (T)this;
+ }
+
+ <T extends AddCloseOp> T setModificationTime(long mtime) {
+ this.mtime = mtime;
+ return (T)this;
+ }
+
+ <T extends AddCloseOp> T setAccessTime(long atime) {
+ this.atime = atime;
+ return (T)this;
+ }
+
+ <T extends AddCloseOp> T setBlockSize(long blockSize) {
+ this.blockSize = blockSize;
+ return (T)this;
+ }
+
+ <T extends AddCloseOp> T setBlocks(Block[] blocks) {
+ this.blocks = blocks;
+ return (T)this;
+ }
+
+ <T extends AddCloseOp> T setPermissionStatus(PermissionStatus permissions) {
+ this.permissions = permissions;
+ return (T)this;
+ }
+
+ <T extends AddCloseOp> T setClientName(String clientName) {
+ this.clientName = clientName;
+ return (T)this;
+ }
+
+ <T extends AddCloseOp> T setClientMachine(String clientMachine) {
+ this.clientMachine = clientMachine;
+ return (T)this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] {
+ new DeprecatedUTF8(path),
+ toLogReplication(replication),
+ toLogLong(mtime),
+ toLogLong(atime),
+ toLogLong(blockSize)};
+ new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair).write(out);
+ new ArrayWritable(Block.class, blocks).write(out);
+ permissions.write(out);
+
+ if (this.opCode == OP_ADD) {
+ new DeprecatedUTF8(clientName).write(out);
+ new DeprecatedUTF8(clientMachine).write(out);
+ }
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
// versions > 0 support per file replication
// get name and replication
@@ -168,6 +276,26 @@ public abstract class FSEditLogOp {
}
}
+ static class AddOp extends AddCloseOp {
+ private AddOp() {
+ super(OP_ADD);
+ }
+
+ static AddOp getInstance() {
+ return (AddOp)opInstances.get().get(OP_ADD);
+ }
+ }
+
+ static class CloseOp extends AddCloseOp {
+ private CloseOp() {
+ super(OP_CLOSE);
+ }
+
+ static CloseOp getInstance() {
+ return (CloseOp)opInstances.get().get(OP_CLOSE);
+ }
+ }
+
static class SetReplicationOp extends FSEditLogOp {
String path;
short replication;
@@ -176,7 +304,29 @@ public abstract class FSEditLogOp {
super(OP_SET_REPLICATION);
}
- public void readFields(DataInputStream in, int logVersion)
+ static SetReplicationOp getInstance() {
+ return (SetReplicationOp)opInstances.get()
+ .get(OP_SET_REPLICATION);
+ }
+
+ SetReplicationOp setPath(String path) {
+ this.path = path;
+ return this;
+ }
+
+ SetReplicationOp setReplication(short replication) {
+ this.replication = replication;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ new DeprecatedUTF8(path).write(out);
+ new DeprecatedUTF8(Short.toString(replication)).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.path = FSImageSerialization.readString(in);
this.replication = readShort(in);
@@ -193,7 +343,41 @@ public abstract class FSEditLogOp {
super(OP_CONCAT_DELETE);
}
- public void readFields(DataInputStream in, int logVersion)
+ static ConcatDeleteOp getInstance() {
+ return (ConcatDeleteOp)opInstances.get()
+ .get(OP_CONCAT_DELETE);
+ }
+
+ ConcatDeleteOp setTarget(String trg) {
+ this.trg = trg;
+ return this;
+ }
+
+ ConcatDeleteOp setSources(String[] srcs) {
+ this.srcs = srcs;
+ return this;
+ }
+
+ ConcatDeleteOp setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ int size = 1 + srcs.length + 1; // trg, srcs, timestamp
+ DeprecatedUTF8 info[] = new DeprecatedUTF8[size];
+ int idx = 0;
+ info[idx++] = new DeprecatedUTF8(trg);
+ for(int i=0; i<srcs.length; i++) {
+ info[idx++] = new DeprecatedUTF8(srcs[i]);
+ }
+ info[idx] = toLogLong(timestamp);
+ new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.length = in.readInt();
if (length < 3) { // trg, srcs.., timestam
@@ -220,7 +404,37 @@ public abstract class FSEditLogOp {
super(OP_RENAME_OLD);
}
- public void readFields(DataInputStream in, int logVersion)
+ static RenameOldOp getInstance() {
+ return (RenameOldOp)opInstances.get()
+ .get(OP_RENAME_OLD);
+ }
+
+ RenameOldOp setSource(String src) {
+ this.src = src;
+ return this;
+ }
+
+ RenameOldOp setDestination(String dst) {
+ this.dst = dst;
+ return this;
+ }
+
+ RenameOldOp setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
+ new DeprecatedUTF8(src),
+ new DeprecatedUTF8(dst),
+ toLogLong(timestamp)};
+ new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.length = in.readInt();
if (this.length != 3) {
@@ -242,9 +456,32 @@ public abstract class FSEditLogOp {
super(OP_DELETE);
}
- public void readFields(DataInputStream in, int logVersion)
- throws IOException {
+ static DeleteOp getInstance() {
+ return (DeleteOp)opInstances.get()
+ .get(OP_DELETE);
+ }
+
+ DeleteOp setPath(String path) {
+ this.path = path;
+ return this;
+ }
+
+ DeleteOp setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
+ new DeprecatedUTF8(path),
+ toLogLong(timestamp)};
+ new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
+ throws IOException {
this.length = in.readInt();
if (this.length != 2) {
throw new IOException("Incorrect data format. "
@@ -264,8 +501,40 @@ public abstract class FSEditLogOp {
private MkdirOp() {
super(OP_MKDIR);
}
+
+ static MkdirOp getInstance() {
+ return (MkdirOp)opInstances.get()
+ .get(OP_MKDIR);
+ }
+
+ MkdirOp setPath(String path) {
+ this.path = path;
+ return this;
+ }
+
+ MkdirOp setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ MkdirOp setPermissionStatus(PermissionStatus permissions) {
+ this.permissions = permissions;
+ return this;
+ }
- public void readFields(DataInputStream in, int logVersion)
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
+ new DeprecatedUTF8(path),
+ toLogLong(timestamp), // mtime
+ toLogLong(timestamp) // atime, unused at this time
+ };
+ new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+ permissions.write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.length = in.readInt();
@@ -299,32 +568,70 @@ public abstract class FSEditLogOp {
super(OP_SET_GENSTAMP);
}
- public void readFields(DataInputStream in, int logVersion)
+ static SetGenstampOp getInstance() {
+ return (SetGenstampOp)opInstances.get()
+ .get(OP_SET_GENSTAMP);
+ }
+
+ SetGenstampOp setGenerationStamp(long genStamp) {
+ this.genStamp = genStamp;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ new LongWritable(genStamp).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.genStamp = in.readLong();
}
}
+ @SuppressWarnings("deprecation")
static class DatanodeAddOp extends FSEditLogOp {
- @SuppressWarnings("deprecation")
private DatanodeAddOp() {
super(OP_DATANODE_ADD);
}
- public void readFields(DataInputStream in, int logVersion)
+ static DatanodeAddOp getInstance() {
+ return (DatanodeAddOp)opInstances.get()
+ .get(OP_DATANODE_ADD);
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ throw new IOException("Deprecated, should not write");
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
//Datanodes are not persistent any more.
FSImageSerialization.DatanodeImage.skipOne(in);
}
}
+ @SuppressWarnings("deprecation")
static class DatanodeRemoveOp extends FSEditLogOp {
- @SuppressWarnings("deprecation")
private DatanodeRemoveOp() {
super(OP_DATANODE_REMOVE);
}
- public void readFields(DataInputStream in, int logVersion)
+ static DatanodeRemoveOp getInstance() {
+ return (DatanodeRemoveOp)opInstances.get()
+ .get(OP_DATANODE_REMOVE);
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ throw new IOException("Deprecated, should not write");
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
DatanodeID nodeID = new DatanodeID();
nodeID.readFields(in);
@@ -340,7 +647,29 @@ public abstract class FSEditLogOp {
super(OP_SET_PERMISSIONS);
}
- public void readFields(DataInputStream in, int logVersion)
+ static SetPermissionsOp getInstance() {
+ return (SetPermissionsOp)opInstances.get()
+ .get(OP_SET_PERMISSIONS);
+ }
+
+ SetPermissionsOp setSource(String src) {
+ this.src = src;
+ return this;
+ }
+
+ SetPermissionsOp setPermissions(FsPermission permissions) {
+ this.permissions = permissions;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ new DeprecatedUTF8(src).write(out);
+ permissions.write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.src = FSImageSerialization.readString(in);
this.permissions = FsPermission.read(in);
@@ -356,13 +685,42 @@ public abstract class FSEditLogOp {
super(OP_SET_OWNER);
}
- public void readFields(DataInputStream in, int logVersion)
+ static SetOwnerOp getInstance() {
+ return (SetOwnerOp)opInstances.get()
+ .get(OP_SET_OWNER);
+ }
+
+ SetOwnerOp setSource(String src) {
+ this.src = src;
+ return this;
+ }
+
+ SetOwnerOp setUser(String username) {
+ this.username = username;
+ return this;
+ }
+
+ SetOwnerOp setGroup(String groupname) {
+ this.groupname = groupname;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ DeprecatedUTF8 u = new DeprecatedUTF8(username == null? "": username);
+ DeprecatedUTF8 g = new DeprecatedUTF8(groupname == null? "": groupname);
+ new DeprecatedUTF8(src).write(out);
+ u.write(out);
+ g.write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.src = FSImageSerialization.readString(in);
this.username = FSImageSerialization.readString_EmptyAsNull(in);
this.groupname = FSImageSerialization.readString_EmptyAsNull(in);
}
-
}
static class SetNSQuotaOp extends FSEditLogOp {
@@ -373,7 +731,18 @@ public abstract class FSEditLogOp {
super(OP_SET_NS_QUOTA);
}
- public void readFields(DataInputStream in, int logVersion)
+ static SetNSQuotaOp getInstance() {
+ return (SetNSQuotaOp)opInstances.get()
+ .get(OP_SET_NS_QUOTA);
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ throw new IOException("Deprecated");
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.src = FSImageSerialization.readString(in);
this.nsQuota = readLongWritable(in);
@@ -387,7 +756,18 @@ public abstract class FSEditLogOp {
super(OP_CLEAR_NS_QUOTA);
}
- public void readFields(DataInputStream in, int logVersion)
+ static ClearNSQuotaOp getInstance() {
+ return (ClearNSQuotaOp)opInstances.get()
+ .get(OP_CLEAR_NS_QUOTA);
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ throw new IOException("Deprecated");
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.src = FSImageSerialization.readString(in);
}
@@ -402,7 +782,35 @@ public abstract class FSEditLogOp {
super(OP_SET_QUOTA);
}
- public void readFields(DataInputStream in, int logVersion)
+ static SetQuotaOp getInstance() {
+ return (SetQuotaOp)opInstances.get()
+ .get(OP_SET_QUOTA);
+ }
+
+ SetQuotaOp setSource(String src) {
+ this.src = src;
+ return this;
+ }
+
+ SetQuotaOp setNSQuota(long nsQuota) {
+ this.nsQuota = nsQuota;
+ return this;
+ }
+
+ SetQuotaOp setDSQuota(long dsQuota) {
+ this.dsQuota = dsQuota;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ new DeprecatedUTF8(src).write(out);
+ new LongWritable(nsQuota).write(out);
+ new LongWritable(dsQuota).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.src = FSImageSerialization.readString(in);
this.nsQuota = readLongWritable(in);
@@ -420,7 +828,37 @@ public abstract class FSEditLogOp {
super(OP_TIMES);
}
- public void readFields(DataInputStream in, int logVersion)
+ static TimesOp getInstance() {
+ return (TimesOp)opInstances.get()
+ .get(OP_TIMES);
+ }
+
+ TimesOp setPath(String path) {
+ this.path = path;
+ return this;
+ }
+
+ TimesOp setModificationTime(long mtime) {
+ this.mtime = mtime;
+ return this;
+ }
+
+ TimesOp setAccessTime(long atime) {
+ this.atime = atime;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
+ new DeprecatedUTF8(path),
+ toLogLong(mtime),
+ toLogLong(atime)};
+ new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.length = in.readInt();
if (length != 3) {
@@ -445,7 +883,49 @@ public abstract class FSEditLogOp {
super(OP_SYMLINK);
}
- public void readFields(DataInputStream in, int logVersion)
+ static SymlinkOp getInstance() {
+ return (SymlinkOp)opInstances.get()
+ .get(OP_SYMLINK);
+ }
+
+ SymlinkOp setPath(String path) {
+ this.path = path;
+ return this;
+ }
+
+ SymlinkOp setValue(String value) {
+ this.value = value;
+ return this;
+ }
+
+ SymlinkOp setModificationTime(long mtime) {
+ this.mtime = mtime;
+ return this;
+ }
+
+ SymlinkOp setAccessTime(long atime) {
+ this.atime = atime;
+ return this;
+ }
+
+ SymlinkOp setPermissionStatus(PermissionStatus permissionStatus) {
+ this.permissionStatus = permissionStatus;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
+ new DeprecatedUTF8(path),
+ new DeprecatedUTF8(value),
+ toLogLong(mtime),
+ toLogLong(atime)};
+ new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+ permissionStatus.write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.length = in.readInt();
@@ -472,7 +952,43 @@ public abstract class FSEditLogOp {
super(OP_RENAME);
}
- public void readFields(DataInputStream in, int logVersion)
+ static RenameOp getInstance() {
+ return (RenameOp)opInstances.get()
+ .get(OP_RENAME);
+ }
+
+ RenameOp setSource(String src) {
+ this.src = src;
+ return this;
+ }
+
+ RenameOp setDestination(String dst) {
+ this.dst = dst;
+ return this;
+ }
+
+ RenameOp setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ RenameOp setOptions(Rename[] options) {
+ this.options = options;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
+ new DeprecatedUTF8(src),
+ new DeprecatedUTF8(dst),
+ toLogLong(timestamp)};
+ new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+ toBytesWritable(options).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.length = in.readInt();
if (this.length != 3) {
@@ -497,6 +1013,14 @@ public abstract class FSEditLogOp {
}
return options;
}
+
+ static BytesWritable toBytesWritable(Rename... options) {
+ byte[] bytes = new byte[options.length];
+ for (int i = 0; i < options.length; i++) {
+ bytes[i] = options[i].value();
+ }
+ return new BytesWritable(bytes);
+ }
}
static class ReassignLeaseOp extends FSEditLogOp {
@@ -507,8 +1031,36 @@ public abstract class FSEditLogOp {
private ReassignLeaseOp() {
super(OP_REASSIGN_LEASE);
}
-
- public void readFields(DataInputStream in, int logVersion)
+
+ static ReassignLeaseOp getInstance() {
+ return (ReassignLeaseOp)opInstances.get()
+ .get(OP_REASSIGN_LEASE);
+ }
+
+ ReassignLeaseOp setLeaseHolder(String leaseHolder) {
+ this.leaseHolder = leaseHolder;
+ return this;
+ }
+
+ ReassignLeaseOp setPath(String path) {
+ this.path = path;
+ return this;
+ }
+
+ ReassignLeaseOp setNewHolder(String newHolder) {
+ this.newHolder = newHolder;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ new DeprecatedUTF8(leaseHolder).write(out);
+ new DeprecatedUTF8(path).write(out);
+ new DeprecatedUTF8(newHolder).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.leaseHolder = FSImageSerialization.readString(in);
this.path = FSImageSerialization.readString(in);
@@ -524,7 +1076,30 @@ public abstract class FSEditLogOp {
super(OP_GET_DELEGATION_TOKEN);
}
- public void readFields(DataInputStream in, int logVersion)
+ static GetDelegationTokenOp getInstance() {
+ return (GetDelegationTokenOp)opInstances.get()
+ .get(OP_GET_DELEGATION_TOKEN);
+ }
+
+ GetDelegationTokenOp setDelegationTokenIdentifier(
+ DelegationTokenIdentifier token) {
+ this.token = token;
+ return this;
+ }
+
+ GetDelegationTokenOp setExpiryTime(long expiryTime) {
+ this.expiryTime = expiryTime;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ token.write(out);
+ toLogLong(expiryTime).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.token = new DelegationTokenIdentifier();
this.token.readFields(in);
@@ -540,7 +1115,30 @@ public abstract class FSEditLogOp {
super(OP_RENEW_DELEGATION_TOKEN);
}
- public void readFields(DataInputStream in, int logVersion)
+ static RenewDelegationTokenOp getInstance() {
+ return (RenewDelegationTokenOp)opInstances.get()
+ .get(OP_RENEW_DELEGATION_TOKEN);
+ }
+
+ RenewDelegationTokenOp setDelegationTokenIdentifier(
+ DelegationTokenIdentifier token) {
+ this.token = token;
+ return this;
+ }
+
+ RenewDelegationTokenOp setExpiryTime(long expiryTime) {
+ this.expiryTime = expiryTime;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ token.write(out);
+ toLogLong(expiryTime).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.token = new DelegationTokenIdentifier();
this.token.readFields(in);
@@ -555,7 +1153,24 @@ public abstract class FSEditLogOp {
super(OP_CANCEL_DELEGATION_TOKEN);
}
- public void readFields(DataInputStream in, int logVersion)
+ static CancelDelegationTokenOp getInstance() {
+ return (CancelDelegationTokenOp)opInstances.get()
+ .get(OP_CANCEL_DELEGATION_TOKEN);
+ }
+
+ CancelDelegationTokenOp setDelegationTokenIdentifier(
+ DelegationTokenIdentifier token) {
+ this.token = token;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ token.write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.token = new DelegationTokenIdentifier();
this.token.readFields(in);
@@ -569,13 +1184,97 @@ public abstract class FSEditLogOp {
super(OP_UPDATE_MASTER_KEY);
}
- public void readFields(DataInputStream in, int logVersion)
+ static UpdateMasterKeyOp getInstance() {
+ return (UpdateMasterKeyOp)opInstances.get()
+ .get(OP_UPDATE_MASTER_KEY);
+ }
+
+ UpdateMasterKeyOp setDelegationKey(DelegationKey key) {
+ this.key = key;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ key.write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.key = new DelegationKey();
this.key.readFields(in);
}
}
-
+
+ static class InvalidOp extends FSEditLogOp {
+ private InvalidOp() {
+ super(OP_INVALID);
+ }
+
+ static InvalidOp getInstance() {
+ return (InvalidOp)opInstances.get().get(OP_INVALID);
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ // nothing to read
+ }
+ }
+
+ static class JSpoolStartOp extends FSEditLogOp {
+ private JSpoolStartOp() {
+ super(OP_JSPOOL_START);
+ }
+
+ static JSpoolStartOp getInstance() {
+ return (JSpoolStartOp)opInstances.get().get(OP_JSPOOL_START);
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ }
+ }
+
+ static class CheckpointTimeOp extends FSEditLogOp {
+ long checkpointTime;
+
+ private CheckpointTimeOp() {
+ super(OP_CHECKPOINT_TIME);
+ }
+
+ CheckpointTimeOp setCheckpointTime(long time) {
+ this.checkpointTime = time;
+ return this;
+ }
+
+ static CheckpointTimeOp getInstance() {
+ return (CheckpointTimeOp)opInstances.get()
+ .get(OP_CHECKPOINT_TIME);
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ new LongWritable(checkpointTime).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ this.checkpointTime = readLong(in);
+ }
+ }
+
static private short readShort(DataInputStream in) throws IOException {
return Short.parseShort(FSImageSerialization.readString(in));
}
@@ -584,6 +1283,14 @@ public abstract class FSEditLogOp {
return Long.parseLong(FSImageSerialization.readString(in));
}
+ static private DeprecatedUTF8 toLogReplication(short replication) {
+ return new DeprecatedUTF8(Short.toString(replication));
+ }
+
+ static private DeprecatedUTF8 toLogLong(long timestamp) {
+ return new DeprecatedUTF8(Long.toString(timestamp));
+ }
+
/**
* A class to read in blocks stored in the old format. The only two
* fields in the block were blockid and length.
@@ -631,13 +1338,36 @@ public abstract class FSEditLogOp {
}
/**
+ * Class for writing editlog ops
+ */
+ public static class Writer {
+ private final DataOutputStream out;
+
+ public Writer(DataOutputStream out) {
+ this.out = out;
+ }
+
+ /**
+ * Write an operation to the output stream
+ *
+ * @param op The operation to write
+ * @throws IOException if an error occurs during writing.
+ */
+ public void writeOp(FSEditLogOp op) throws IOException {
+ out.writeByte(op.opCode.getOpCode());
+
+ op.writeFields(out);
+ }
+ }
+
+ /**
* Class for reading editlog ops from a stream
*/
public static class Reader {
private final DataInputStream in;
private final int logVersion;
private final Checksum checksum;
- private EnumMap<FSEditLogOpCodes, FSEditLogOp> opInstances;
+
/**
* Construct the reader
* @param in The stream to read from.
@@ -650,32 +1380,6 @@ public abstract class FSEditLogOp {
this.in = in;
this.logVersion = logVersion;
this.checksum = checksum;
- opInstances = new EnumMap<FSEditLogOpCodes, FSEditLogOp>(
- FSEditLogOpCodes.class);
- opInstances.put(OP_ADD, new AddCloseOp(OP_ADD));
- opInstances.put(OP_CLOSE, new AddCloseOp(OP_CLOSE));
- opInstances.put(OP_SET_REPLICATION, new SetReplicationOp());
- opInstances.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
- opInstances.put(OP_RENAME_OLD, new RenameOldOp());
- opInstances.put(OP_DELETE, new DeleteOp());
- opInstances.put(OP_MKDIR, new MkdirOp());
- opInstances.put(OP_SET_GENSTAMP, new SetGenstampOp());
- opInstances.put(OP_DATANODE_ADD, new DatanodeAddOp());
- opInstances.put(OP_DATANODE_REMOVE, new DatanodeRemoveOp());
- opInstances.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
- opInstances.put(OP_SET_OWNER, new SetOwnerOp());
- opInstances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
- opInstances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
- opInstances.put(OP_SET_QUOTA, new SetQuotaOp());
- opInstances.put(OP_TIMES, new TimesOp());
- opInstances.put(OP_SYMLINK, new SymlinkOp());
- opInstances.put(OP_RENAME, new RenameOp());
- opInstances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
- opInstances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
- opInstances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
- opInstances.put(OP_CANCEL_DELEGATION_TOKEN,
- new CancelDelegationTokenOp());
- opInstances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
}
/**
@@ -708,7 +1412,7 @@ public abstract class FSEditLogOp {
return null;
}
- FSEditLogOp op = opInstances.get(opCode);
+ FSEditLogOp op = opInstances.get().get(opCode);
if (op == null) {
throw new IOException("Read invalid opcode " + opCode);
}