You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/29 09:10:53 UTC
svn commit: r1152128 [2/3] - in /hadoop/common/branches/HDFS-1073/hdfs: ./
src/c++/libhdfs/ src/contrib/ src/contrib/hdfsproxy/
src/docs/src/documentation/content/xdocs/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/...
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Fri Jul 29 07:10:48 2011
@@ -107,7 +107,7 @@ public class BackupImage extends FSImage
StorageDirectory sd = it.next();
StorageState curState;
try {
- curState = sd.analyzeStorage(HdfsConstants.StartupOption.REGULAR);
+ curState = sd.analyzeStorage(HdfsConstants.StartupOption.REGULAR, storage);
// sd is locked but not opened
switch(curState) {
case NON_EXISTENT:
@@ -126,7 +126,8 @@ public class BackupImage extends FSImage
sd.doRecover(curState);
}
if(curState != StorageState.NOT_FORMATTED) {
- sd.read(); // read and verify consistency with other directories
+ // read and verify consistency with other directories
+ storage.readProperties(sd);
}
} catch(IOException ioe) {
sd.unlock();
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Fri Jul 29 07:10:48 2011
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.na
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.Arrays;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -27,7 +26,6 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
@@ -45,26 +43,9 @@ class EditLogBackupOutputStream extends
private JournalProtocol backupNode; // RPC proxy to backup node
private NamenodeRegistration bnRegistration; // backup node registration
private NamenodeRegistration nnRegistration; // active node registration
- private ArrayList<JournalRecord> bufCurrent; // current buffer for writing
- private ArrayList<JournalRecord> bufReady; // buffer ready for flushing
+ private EditsDoubleBuffer doubleBuf;
private DataOutputBuffer out; // serialized output sent to backup node
- static class JournalRecord {
- byte op;
- long txid;
- Writable[] args;
-
- JournalRecord(byte op, long txid, Writable ... writables) {
- this.op = op;
- this.txid = txid;
- this.args = writables;
- }
-
- void write(DataOutputBuffer out) throws IOException {
- writeChecksummedOp(out, op, txid, args);
- }
- }
-
EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
NamenodeRegistration nnReg) // active name-node
throws IOException {
@@ -82,8 +63,7 @@ class EditLogBackupOutputStream extends
Storage.LOG.error("Error connecting to: " + bnAddress, e);
throw e;
}
- this.bufCurrent = new ArrayList<JournalRecord>();
- this.bufReady = new ArrayList<JournalRecord>();
+ this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
}
@@ -97,14 +77,14 @@ class EditLogBackupOutputStream extends
return JournalType.BACKUP;
}
- @Override
- void write(byte[] data, int i, int length) throws IOException {
- throw new IOException("Not implemented");
- }
-
@Override // EditLogOutputStream
- void write(byte op, long txid, Writable ... writables) throws IOException {
- bufCurrent.add(new JournalRecord(op, txid, writables));
+ void write(FSEditLogOp op) throws IOException {
+ doubleBuf.writeOp(op);
+ }
+
+ @Override
+ void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+ throw new IOException("Not supported");
}
/**
@@ -112,51 +92,53 @@ class EditLogBackupOutputStream extends
*/
@Override // EditLogOutputStream
void create() throws IOException {
- bufCurrent.clear();
- assert bufReady.size() == 0 : "previous data is not flushed yet";
+ assert doubleBuf.isFlushed() : "previous data is not flushed yet";
+ this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
}
@Override // EditLogOutputStream
public void close() throws IOException {
// close should have been called after all pending transactions
// have been flushed & synced.
- int size = bufCurrent.size();
+ int size = doubleBuf.countBufferedBytes();
if (size != 0) {
throw new IOException("BackupEditStream has " + size +
" records still to be flushed and cannot be closed.");
}
RPC.stopProxy(backupNode); // stop the RPC threads
- bufCurrent = bufReady = null;
+ doubleBuf.close();
+ doubleBuf = null;
}
@Override
public void abort() throws IOException {
RPC.stopProxy(backupNode);
- bufCurrent = bufReady = null;
+ doubleBuf = null;
}
@Override // EditLogOutputStream
void setReadyToFlush() throws IOException {
- assert bufReady.size() == 0 : "previous data is not flushed yet";
- ArrayList<JournalRecord> tmp = bufReady;
- bufReady = bufCurrent;
- bufCurrent = tmp;
+ doubleBuf.setReadyToFlush();
}
@Override // EditLogOutputStream
protected void flushAndSync() throws IOException {
- assert out.size() == 0 : "Output buffer is not empty";
- for (JournalRecord jRec : bufReady) {
- jRec.write(out);
- }
- if (out.size() > 0) {
+ assert out.getLength() == 0 : "Output buffer is not empty";
+
+ int numReadyTxns = doubleBuf.countReadyTxns();
+ long firstTxToFlush = doubleBuf.getFirstReadyTxId();
+
+ doubleBuf.flushTo(out);
+ if (out.getLength() > 0) {
+ assert numReadyTxns > 0;
+
byte[] data = Arrays.copyOf(out.getData(), out.getLength());
+ out.reset();
+ assert out.getLength() == 0 : "Output buffer is not empty";
+
backupNode.journal(nnRegistration,
- bufReady.get(0).txid, bufReady.size(),
- data);
+ firstTxToFlush, numReadyTxns, data);
}
- bufReady.clear(); // erase all data in the buffer
- out.reset(); // reset buffer to the start position
}
/**
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Fri Jul 29 07:10:48 2011
@@ -28,9 +28,7 @@ import java.nio.channels.FileChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Writable;
import com.google.common.annotations.VisibleForTesting;
@@ -46,10 +44,7 @@ class EditLogFileOutputStream extends Ed
private File file;
private FileOutputStream fp; // file stream for storing edit logs
private FileChannel fc; // channel of the file stream for sync
- private DataOutputBuffer bufCurrent; // current buffer for writing
- private DataOutputBuffer bufReady; // buffer ready for flushing
- final private int initBufferSize; // inital buffer size
-
+ private EditsDoubleBuffer doubleBuf;
static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB
static {
@@ -71,9 +66,7 @@ class EditLogFileOutputStream extends Ed
EditLogFileOutputStream(File name, int size) throws IOException {
super();
file = name;
- initBufferSize = size;
- bufCurrent = new DataOutputBuffer(size);
- bufReady = new DataOutputBuffer(size);
+ doubleBuf = new EditsDoubleBuffer(size);
RandomAccessFile rp = new RandomAccessFile(name, "rw");
fp = new FileOutputStream(rp.getFD()); // open for append
fc = rp.getChannel();
@@ -90,15 +83,10 @@ class EditLogFileOutputStream extends Ed
return JournalType.FILE;
}
- /**
- * Write a single byte to the output stream.
- * @param b the byte to write
- */
- private void write(int b) throws IOException {
- if (fp == null) {
- throw new IOException("Trying to use aborted output stream");
- }
- bufCurrent.write(b);
+ /** {@inheritDoc} */
+ @Override
+ void write(FSEditLogOp op) throws IOException {
+ doubleBuf.writeOp(op);
}
/**
@@ -110,16 +98,8 @@ class EditLogFileOutputStream extends Ed
* </ul>
* */
@Override
- void write(byte op, long txid, Writable... writables) throws IOException {
- if (fp == null) {
- throw new IOException("Trying to use aborted output stream");
- }
- writeChecksummedOp(bufCurrent, op, txid, writables);
- }
-
- @Override
- void write(final byte[] data, int off, int len) throws IOException {
- bufCurrent.write(data, off, len);
+ void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+ doubleBuf.writeRaw(bytes, offset, length);
}
/**
@@ -129,7 +109,7 @@ class EditLogFileOutputStream extends Ed
void create() throws IOException {
fc.truncate(0);
fc.position(0);
- bufCurrent.writeInt(FSConstants.LAYOUT_VERSION);
+ doubleBuf.getCurrentBuf().writeInt(FSConstants.LAYOUT_VERSION);
setReadyToFlush();
flush();
}
@@ -144,22 +124,11 @@ class EditLogFileOutputStream extends Ed
// close should have been called after all pending transactions
// have been flushed & synced.
// if already closed, just skip
- if(bufCurrent != null)
- {
- int bufSize = bufCurrent.size();
- if (bufSize != 0) {
- throw new IOException("FSEditStream has " + bufSize
- + " bytes still to be flushed and cannot " + "be closed.");
- }
- bufCurrent.close();
- bufCurrent = null;
- }
-
- if(bufReady != null) {
- bufReady.close();
- bufReady = null;
+ if (doubleBuf != null) {
+ doubleBuf.close();
+ doubleBuf = null;
}
-
+
// remove the last INVALID marker from transaction log.
if (fc != null && fc.isOpen()) {
fc.truncate(fc.position());
@@ -171,8 +140,8 @@ class EditLogFileOutputStream extends Ed
fp = null;
}
} finally {
- IOUtils.cleanup(FSNamesystem.LOG, bufCurrent, bufReady, fc, fp);
- bufCurrent = bufReady = null;
+ IOUtils.cleanup(FSNamesystem.LOG, fc, fp);
+ doubleBuf = null;
fc = null;
fp = null;
}
@@ -194,11 +163,8 @@ class EditLogFileOutputStream extends Ed
*/
@Override
void setReadyToFlush() throws IOException {
- assert bufReady.size() == 0 : "previous data is not flushed yet";
- write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
- DataOutputBuffer tmp = bufReady;
- bufReady = bufCurrent;
- bufCurrent = tmp;
+ doubleBuf.getCurrentBuf().write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
+ doubleBuf.setReadyToFlush();
}
/**
@@ -212,8 +178,7 @@ class EditLogFileOutputStream extends Ed
}
preallocate(); // preallocate file if necessary
- bufReady.writeTo(fp); // write data to file
- bufReady.reset(); // erase all data in the buffer
+ doubleBuf.flushTo(fp);
fc.force(false); // metadata updates not needed because of preallocation
fc.position(fc.position() - 1); // skip back the end-of-file marker
}
@@ -223,7 +188,7 @@ class EditLogFileOutputStream extends Ed
*/
@Override
public boolean shouldForceSync() {
- return bufReady.size() >= initBufferSize;
+ return doubleBuf.shouldForceSync();
}
/**
@@ -232,8 +197,8 @@ class EditLogFileOutputStream extends Ed
@Override
long length() throws IOException {
// file size - header size + size of both buffers
- return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES + bufReady.size()
- + bufCurrent.size();
+ return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES +
+ doubleBuf.countBufferedBytes();
}
// allocate a big chunk of data
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Fri Jul 29 07:10:48 2011
@@ -39,26 +39,27 @@ abstract class EditLogOutputStream imple
}
/**
- * Write edits log record into the stream.
- * The record is represented by operation name and
- * an array of Writable arguments.
+ * Write edits log operation to the stream.
*
* @param op operation
- * @param txid the transaction ID of this operation
- * @param writables array of Writable arguments
* @throws IOException
*/
- abstract void write(byte op, long txid, Writable ... writables)
- throws IOException;
-
+ abstract void write(FSEditLogOp op) throws IOException;
+
/**
* Write raw data to an edit log. This data should already have
* the transaction ID, checksum, etc included. It is for use
* within the BackupNode when replicating edits from the
* NameNode.
+ *
+ * @param bytes the bytes to write.
+ * @param offset offset in the bytes to write from
+ * @param length number of bytes to write
+ * @throws IOException
*/
- abstract void write(byte[] data, int offset, int length) throws IOException;
-
+ abstract void writeRaw(byte[] bytes, int offset, int length)
+ throws IOException;
+
/**
* Create and initialize underlying persistent edits log storage.
*
@@ -139,26 +140,4 @@ abstract class EditLogOutputStream imple
public String toString() {
return getName();
}
-
- /**
- * Write the given operation to the specified buffer, including
- * the transaction ID and checksum.
- */
- protected static void writeChecksummedOp(
- DataOutputBuffer buf, byte op, long txid, Writable... writables)
- throws IOException {
- int start = buf.getLength();
- buf.write(op);
- buf.writeLong(txid);
- for (Writable w : writables) {
- w.write(buf);
- }
- // write transaction checksum
- int end = buf.getLength();
- Checksum checksum = FSEditLog.getChecksum();
- checksum.reset();
- checksum.update(buf.getData(), start, end-start);
- int sum = (int)checksum.getValue();
- buf.writeInt(sum);
- }
}
Copied: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java (from r1151750, hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java)
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java?p2=hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java&p1=hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java&r1=1151750&r2=1152128&rev=1152128&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java Fri Jul 29 07:10:48 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
@@ -35,20 +36,19 @@ import com.google.common.base.Preconditi
*/
class EditsDoubleBuffer {
- private DataOutputBuffer bufCurrent; // current buffer for writing
- private DataOutputBuffer bufReady; // buffer ready for flushing
+ private TxnBuffer bufCurrent; // current buffer for writing
+ private TxnBuffer bufReady; // buffer ready for flushing
private final int initBufferSize;
- private Writer writer;
public EditsDoubleBuffer(int defaultBufferSize) {
initBufferSize = defaultBufferSize;
- bufCurrent = new DataOutputBuffer(initBufferSize);
- bufReady = new DataOutputBuffer(initBufferSize);
- writer = new FSEditLogOp.Writer(bufCurrent);
+ bufCurrent = new TxnBuffer(initBufferSize);
+ bufReady = new TxnBuffer(initBufferSize);
+
}
public void writeOp(FSEditLogOp op) throws IOException {
- writer.writeOp(op);
+ bufCurrent.writeOp(op);
}
void writeRaw(byte[] bytes, int offset, int length) throws IOException {
@@ -71,10 +71,9 @@ class EditsDoubleBuffer {
void setReadyToFlush() {
assert isFlushed() : "previous data not flushed yet";
- DataOutputBuffer tmp = bufReady;
+ TxnBuffer tmp = bufReady;
bufReady = bufCurrent;
bufCurrent = tmp;
- writer = new FSEditLogOp.Writer(bufCurrent);
}
/**
@@ -102,4 +101,50 @@ class EditsDoubleBuffer {
return bufReady.size() + bufCurrent.size();
}
+ /**
+ * @return the transaction ID of the first transaction ready to be flushed
+ */
+ public long getFirstReadyTxId() {
+ assert bufReady.firstTxId > 0;
+ return bufReady.firstTxId;
+ }
+
+ /**
+ * @return the number of transactions that are ready to be flushed
+ */
+ public int countReadyTxns() {
+ return bufReady.numTxns;
+ }
+
+
+ private static class TxnBuffer extends DataOutputBuffer {
+ long firstTxId;
+ int numTxns;
+ private Writer writer;
+
+ public TxnBuffer(int initBufferSize) {
+ super(initBufferSize);
+ writer = new FSEditLogOp.Writer(this);
+ reset();
+ }
+
+ public void writeOp(FSEditLogOp op) throws IOException {
+ if (firstTxId == FSConstants.INVALID_TXID) {
+ firstTxId = op.txid;
+ } else {
+ assert op.txid > firstTxId;
+ }
+ writer.writeOp(op);
+ numTxns++;
+ }
+
+ @Override
+ public DataOutputBuffer reset() {
+ super.reset();
+ firstTxId = FSConstants.INVALID_TXID;
+ numTxns = 0;
+ return this;
+ }
+ }
+
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Jul 29 07:10:48 2011
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.zip.Checksum;
+import java.util.zip.CheckedOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -28,7 +29,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -40,7 +40,6 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
-import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
@@ -52,6 +51,7 @@ import com.google.common.base.Preconditi
import com.google.common.collect.Lists;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
/**
* FSEditLog maintains a log of the namespace modifications.
@@ -207,7 +207,7 @@ public class FSEditLog {
* Write an operation to the edit log. Do not sync to persistent
* store yet.
*/
- void logEdit(final FSEditLogOpCodes opCode, final Writable ... writables) {
+ void logEdit(final FSEditLogOp op) {
synchronized (this) {
assert state != State.CLOSED;
@@ -219,12 +219,13 @@ public class FSEditLog {
}
long start = beginTransaction();
+ op.setTransactionId(txid);
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (!jas.isActive()) return;
- jas.stream.write(opCode.getOpCode(), txid, writables);
+ jas.stream.write(op);
}
}, "logging edit");
@@ -520,49 +521,45 @@ public class FSEditLog {
* Records the block locations of the last block.
*/
public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
-
- DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] {
- new DeprecatedUTF8(path),
- FSEditLog.toLogReplication(newNode.getReplication()),
- FSEditLog.toLogLong(newNode.getModificationTime()),
- FSEditLog.toLogLong(newNode.getAccessTime()),
- FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
- logEdit(OP_ADD,
- new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair),
- new ArrayWritable(Block.class, newNode.getBlocks()),
- newNode.getPermissionStatus(),
- new DeprecatedUTF8(newNode.getClientName()),
- new DeprecatedUTF8(newNode.getClientMachine()));
+ AddOp op = AddOp.getInstance()
+ .setPath(path)
+ .setReplication(newNode.getReplication())
+ .setModificationTime(newNode.getModificationTime())
+ .setAccessTime(newNode.getAccessTime())
+ .setBlockSize(newNode.getPreferredBlockSize())
+ .setBlocks(newNode.getBlocks())
+ .setPermissionStatus(newNode.getPermissionStatus())
+ .setClientName(newNode.getClientName())
+ .setClientMachine(newNode.getClientMachine());
+
+ logEdit(op);
}
/**
* Add close lease record to edit log.
*/
public void logCloseFile(String path, INodeFile newNode) {
- DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] {
- new DeprecatedUTF8(path),
- FSEditLog.toLogReplication(newNode.getReplication()),
- FSEditLog.toLogLong(newNode.getModificationTime()),
- FSEditLog.toLogLong(newNode.getAccessTime()),
- FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
- logEdit(OP_CLOSE,
- new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair),
- new ArrayWritable(Block.class, newNode.getBlocks()),
- newNode.getPermissionStatus());
+ CloseOp op = CloseOp.getInstance()
+ .setPath(path)
+ .setReplication(newNode.getReplication())
+ .setModificationTime(newNode.getModificationTime())
+ .setAccessTime(newNode.getAccessTime())
+ .setBlockSize(newNode.getPreferredBlockSize())
+ .setBlocks(newNode.getBlocks())
+ .setPermissionStatus(newNode.getPermissionStatus());
+
+ logEdit(op);
}
/**
* Add create directory record to edit log
*/
public void logMkDir(String path, INode newNode) {
- DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
- new DeprecatedUTF8(path),
- FSEditLog.toLogLong(newNode.getModificationTime()),
- FSEditLog.toLogLong(newNode.getAccessTime())
- };
- logEdit(OP_MKDIR,
- new ArrayWritable(DeprecatedUTF8.class, info),
- newNode.getPermissionStatus());
+ MkdirOp op = MkdirOp.getInstance()
+ .setPath(path)
+ .setTimestamp(newNode.getModificationTime())
+ .setPermissionStatus(newNode.getPermissionStatus());
+ logEdit(op);
}
/**
@@ -570,33 +567,33 @@ public class FSEditLog {
* TODO: use String parameters until just before writing to disk
*/
void logRename(String src, String dst, long timestamp) {
- DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
- new DeprecatedUTF8(src),
- new DeprecatedUTF8(dst),
- FSEditLog.toLogLong(timestamp)};
- logEdit(OP_RENAME_OLD, new ArrayWritable(DeprecatedUTF8.class, info));
+ RenameOldOp op = RenameOldOp.getInstance()
+ .setSource(src)
+ .setDestination(dst)
+ .setTimestamp(timestamp);
+ logEdit(op);
}
/**
* Add rename record to edit log
*/
void logRename(String src, String dst, long timestamp, Options.Rename... options) {
- DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
- new DeprecatedUTF8(src),
- new DeprecatedUTF8(dst),
- FSEditLog.toLogLong(timestamp)};
- logEdit(OP_RENAME,
- new ArrayWritable(DeprecatedUTF8.class, info),
- toBytesWritable(options));
+ RenameOp op = RenameOp.getInstance()
+ .setSource(src)
+ .setDestination(dst)
+ .setTimestamp(timestamp)
+ .setOptions(options);
+ logEdit(op);
}
/**
* Add set replication record to edit log
*/
void logSetReplication(String src, short replication) {
- logEdit(OP_SET_REPLICATION,
- new DeprecatedUTF8(src),
- FSEditLog.toLogReplication(replication));
+ SetReplicationOp op = SetReplicationOp.getInstance()
+ .setPath(src)
+ .setReplication(replication);
+ logEdit(op);
}
/** Add set namespace quota record to edit log
@@ -605,64 +602,69 @@ public class FSEditLog {
* @param quota the directory size limit
*/
void logSetQuota(String src, long nsQuota, long dsQuota) {
- logEdit(OP_SET_QUOTA,
- new DeprecatedUTF8(src),
- new LongWritable(nsQuota), new LongWritable(dsQuota));
+ SetQuotaOp op = SetQuotaOp.getInstance()
+ .setSource(src)
+ .setNSQuota(nsQuota)
+ .setDSQuota(dsQuota);
+ logEdit(op);
}
/** Add set permissions record to edit log */
void logSetPermissions(String src, FsPermission permissions) {
- logEdit(OP_SET_PERMISSIONS, new DeprecatedUTF8(src), permissions);
+ SetPermissionsOp op = SetPermissionsOp.getInstance()
+ .setSource(src)
+ .setPermissions(permissions);
+ logEdit(op);
}
/** Add set owner record to edit log */
void logSetOwner(String src, String username, String groupname) {
- DeprecatedUTF8 u = new DeprecatedUTF8(username == null? "": username);
- DeprecatedUTF8 g = new DeprecatedUTF8(groupname == null? "": groupname);
- logEdit(OP_SET_OWNER, new DeprecatedUTF8(src), u, g);
+ SetOwnerOp op = SetOwnerOp.getInstance()
+ .setSource(src)
+ .setUser(username)
+ .setGroup(groupname);
+ logEdit(op);
}
/**
* concat(trg,src..) log
*/
void logConcat(String trg, String [] srcs, long timestamp) {
- int size = 1 + srcs.length + 1; // trg, srcs, timestamp
- DeprecatedUTF8 info[] = new DeprecatedUTF8[size];
- int idx = 0;
- info[idx++] = new DeprecatedUTF8(trg);
- for(int i=0; i<srcs.length; i++) {
- info[idx++] = new DeprecatedUTF8(srcs[i]);
- }
- info[idx] = FSEditLog.toLogLong(timestamp);
- logEdit(OP_CONCAT_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
+ ConcatDeleteOp op = ConcatDeleteOp.getInstance()
+ .setTarget(trg)
+ .setSources(srcs)
+ .setTimestamp(timestamp);
+ logEdit(op);
}
/**
* Add delete file record to edit log
*/
void logDelete(String src, long timestamp) {
- DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
- new DeprecatedUTF8(src),
- FSEditLog.toLogLong(timestamp)};
- logEdit(OP_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
+ DeleteOp op = DeleteOp.getInstance()
+ .setPath(src)
+ .setTimestamp(timestamp);
+ logEdit(op);
}
/**
* Add generation stamp record to edit log
*/
void logGenerationStamp(long genstamp) {
- logEdit(OP_SET_GENSTAMP, new LongWritable(genstamp));
+ SetGenstampOp op = SetGenstampOp.getInstance()
+ .setGenerationStamp(genstamp);
+ logEdit(op);
}
/**
* Add access time record to edit log
*/
void logTimes(String src, long mtime, long atime) {
- DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
- new DeprecatedUTF8(src),
- FSEditLog.toLogLong(mtime),
- FSEditLog.toLogLong(atime)};
- logEdit(OP_TIMES, new ArrayWritable(DeprecatedUTF8.class, info));
+ TimesOp op = TimesOp.getInstance()
+ .setPath(src)
+ .setModificationTime(mtime)
+ .setAccessTime(atime);
+ logEdit(op);
}
/**
@@ -670,14 +672,13 @@ public class FSEditLog {
*/
void logSymlink(String path, String value, long mtime,
long atime, INodeSymlink node) {
- DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
- new DeprecatedUTF8(path),
- new DeprecatedUTF8(value),
- FSEditLog.toLogLong(mtime),
- FSEditLog.toLogLong(atime)};
- logEdit(OP_SYMLINK,
- new ArrayWritable(DeprecatedUTF8.class, info),
- node.getPermissionStatus());
+ SymlinkOp op = SymlinkOp.getInstance()
+ .setPath(path)
+ .setValue(value)
+ .setModificationTime(mtime)
+ .setAccessTime(atime)
+ .setPermissionStatus(node.getPermissionStatus());
+ logEdit(op);
}
/**
@@ -688,36 +689,40 @@ public class FSEditLog {
*/
void logGetDelegationToken(DelegationTokenIdentifier id,
long expiryTime) {
- logEdit(OP_GET_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
+ GetDelegationTokenOp op = GetDelegationTokenOp.getInstance()
+ .setDelegationTokenIdentifier(id)
+ .setExpiryTime(expiryTime);
+ logEdit(op);
}
void logRenewDelegationToken(DelegationTokenIdentifier id,
long expiryTime) {
- logEdit(OP_RENEW_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
+ RenewDelegationTokenOp op = RenewDelegationTokenOp.getInstance()
+ .setDelegationTokenIdentifier(id)
+ .setExpiryTime(expiryTime);
+ logEdit(op);
}
void logCancelDelegationToken(DelegationTokenIdentifier id) {
- logEdit(OP_CANCEL_DELEGATION_TOKEN, id);
+ CancelDelegationTokenOp op = CancelDelegationTokenOp.getInstance()
+ .setDelegationTokenIdentifier(id);
+ logEdit(op);
}
void logUpdateMasterKey(DelegationKey key) {
- logEdit(OP_UPDATE_MASTER_KEY, key);
+ UpdateMasterKeyOp op = UpdateMasterKeyOp.getInstance()
+ .setDelegationKey(key);
+ logEdit(op);
}
void logReassignLease(String leaseHolder, String src, String newHolder) {
- logEdit(OP_REASSIGN_LEASE, new DeprecatedUTF8(leaseHolder),
- new DeprecatedUTF8(src),
- new DeprecatedUTF8(newHolder));
- }
-
- static private DeprecatedUTF8 toLogReplication(short replication) {
- return new DeprecatedUTF8(Short.toString(replication));
+ ReassignLeaseOp op = ReassignLeaseOp.getInstance()
+ .setLeaseHolder(leaseHolder)
+ .setPath(src)
+ .setNewHolder(newHolder);
+ logEdit(op);
}
- static private DeprecatedUTF8 toLogLong(long timestamp) {
- return new DeprecatedUTF8(Long.toString(timestamp));
- }
-
/**
* @return the number of active (non-failed) journals
*/
@@ -818,7 +823,8 @@ public class FSEditLog {
state = State.IN_SEGMENT;
if (writeHeaderTxn) {
- logEdit(FSEditLogOpCodes.OP_START_LOG_SEGMENT);
+ logEdit(LogSegmentOp.getInstance(
+ FSEditLogOpCodes.OP_START_LOG_SEGMENT));
logSync();
}
}
@@ -833,7 +839,8 @@ public class FSEditLog {
"Bad state: %s", state);
if (writeEndTxn) {
- logEdit(FSEditLogOpCodes.OP_END_LOG_SEGMENT);
+ logEdit(LogSegmentOp.getInstance(
+ FSEditLogOpCodes.OP_END_LOG_SEGMENT));
logSync();
}
@@ -992,7 +999,7 @@ public class FSEditLog {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
- jas.getCurrentStream().write(data, 0, length);
+ jas.getCurrentStream().writeRaw(data, 0, length); // TODO writeRaw
}
}
}, "Logging edit");
@@ -1000,14 +1007,6 @@ public class FSEditLog {
endTransaction(start);
}
- static BytesWritable toBytesWritable(Options.Rename... options) {
- byte[] bytes = new byte[options.length];
- for (int i = 0; i < options.length; i++) {
- bytes[i] = options[i].value();
- }
- return new BytesWritable(bytes);
- }
-
//// Iteration across journals
private interface JournalClosure {
public void apply(JournalAndStream jas) throws IOException;
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Fri Jul 29 07:10:48 2011
@@ -40,14 +40,18 @@ import org.apache.hadoop.hdfs.server.com
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.hdfs.DeprecatedUTF8;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.EOFException;
@@ -63,6 +67,45 @@ public abstract class FSEditLogOp {
long txid;
+ @SuppressWarnings("deprecation")
+ private static ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>> opInstances =
+ new ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>>() {
+ @Override
+ protected EnumMap<FSEditLogOpCodes, FSEditLogOp> initialValue() {
+ EnumMap<FSEditLogOpCodes, FSEditLogOp> instances
+ = new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class);
+ instances.put(OP_ADD, new AddOp());
+ instances.put(OP_CLOSE, new CloseOp());
+ instances.put(OP_SET_REPLICATION, new SetReplicationOp());
+ instances.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
+ instances.put(OP_RENAME_OLD, new RenameOldOp());
+ instances.put(OP_DELETE, new DeleteOp());
+ instances.put(OP_MKDIR, new MkdirOp());
+ instances.put(OP_SET_GENSTAMP, new SetGenstampOp());
+ instances.put(OP_DATANODE_ADD, new DatanodeAddOp());
+ instances.put(OP_DATANODE_REMOVE, new DatanodeRemoveOp());
+ instances.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
+ instances.put(OP_SET_OWNER, new SetOwnerOp());
+ instances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
+ instances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
+ instances.put(OP_SET_QUOTA, new SetQuotaOp());
+ instances.put(OP_TIMES, new TimesOp());
+ instances.put(OP_SYMLINK, new SymlinkOp());
+ instances.put(OP_RENAME, new RenameOp());
+ instances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
+ instances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
+ instances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
+ instances.put(OP_CANCEL_DELEGATION_TOKEN,
+ new CancelDelegationTokenOp());
+ instances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
+ instances.put(OP_START_LOG_SEGMENT,
+ new LogSegmentOp(OP_START_LOG_SEGMENT));
+ instances.put(OP_END_LOG_SEGMENT,
+ new LogSegmentOp(OP_END_LOG_SEGMENT));
+ return instances;
+ }
+ };
+
/**
* Constructor for an EditLog Op. EditLog ops cannot be constructed
* directly, but only through Reader#readOp.
@@ -76,10 +119,14 @@ public abstract class FSEditLogOp {
this.txid = txid;
}
- public abstract void readFields(DataInputStream in, int logVersion)
+ abstract void readFields(DataInputStream in, int logVersion)
throws IOException;
- static class AddCloseOp extends FSEditLogOp {
+ abstract void writeFields(DataOutputStream out)
+ throws IOException;
+
+ @SuppressWarnings("unchecked")
+ static abstract class AddCloseOp extends FSEditLogOp {
int length;
String path;
short replication;
@@ -97,7 +144,71 @@ public abstract class FSEditLogOp {
assert(opCode == OP_ADD || opCode == OP_CLOSE);
}
- public void readFields(DataInputStream in, int logVersion)
+ <T extends AddCloseOp> T setPath(String path) {
+ this.path = path;
+ return (T)this;
+ }
+
+ <T extends AddCloseOp> T setReplication(short replication) {
+ this.replication = replication;
+ return (T)this;
+ }
+
+ <T extends AddCloseOp> T setModificationTime(long mtime) {
+ this.mtime = mtime;
+ return (T)this;
+ }
+
+ <T extends AddCloseOp> T setAccessTime(long atime) {
+ this.atime = atime;
+ return (T)this;
+ }
+
+ <T extends AddCloseOp> T setBlockSize(long blockSize) {
+ this.blockSize = blockSize;
+ return (T)this;
+ }
+
+ <T extends AddCloseOp> T setBlocks(Block[] blocks) {
+ this.blocks = blocks;
+ return (T)this;
+ }
+
+ <T extends AddCloseOp> T setPermissionStatus(PermissionStatus permissions) {
+ this.permissions = permissions;
+ return (T)this;
+ }
+
+ <T extends AddCloseOp> T setClientName(String clientName) {
+ this.clientName = clientName;
+ return (T)this;
+ }
+
+ <T extends AddCloseOp> T setClientMachine(String clientMachine) {
+ this.clientMachine = clientMachine;
+ return (T)this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] {
+ new DeprecatedUTF8(path),
+ toLogReplication(replication),
+ toLogLong(mtime),
+ toLogLong(atime),
+ toLogLong(blockSize)};
+ new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair).write(out);
+ new ArrayWritable(Block.class, blocks).write(out);
+ permissions.write(out);
+
+ if (this.opCode == OP_ADD) {
+ new DeprecatedUTF8(clientName).write(out);
+ new DeprecatedUTF8(clientMachine).write(out);
+ }
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
// versions > 0 support per file replication
// get name and replication
@@ -178,6 +289,26 @@ public abstract class FSEditLogOp {
}
}
+ static class AddOp extends AddCloseOp {
+ private AddOp() {
+ super(OP_ADD);
+ }
+
+ static AddOp getInstance() {
+ return (AddOp)opInstances.get().get(OP_ADD);
+ }
+ }
+
+ static class CloseOp extends AddCloseOp {
+ private CloseOp() {
+ super(OP_CLOSE);
+ }
+
+ static CloseOp getInstance() {
+ return (CloseOp)opInstances.get().get(OP_CLOSE);
+ }
+ }
+
static class SetReplicationOp extends FSEditLogOp {
String path;
short replication;
@@ -186,7 +317,29 @@ public abstract class FSEditLogOp {
super(OP_SET_REPLICATION);
}
- public void readFields(DataInputStream in, int logVersion)
+ static SetReplicationOp getInstance() {
+ return (SetReplicationOp)opInstances.get()
+ .get(OP_SET_REPLICATION);
+ }
+
+ SetReplicationOp setPath(String path) {
+ this.path = path;
+ return this;
+ }
+
+ SetReplicationOp setReplication(short replication) {
+ this.replication = replication;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ new DeprecatedUTF8(path).write(out);
+ new DeprecatedUTF8(Short.toString(replication)).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.path = FSImageSerialization.readString(in);
this.replication = readShort(in);
@@ -203,7 +356,41 @@ public abstract class FSEditLogOp {
super(OP_CONCAT_DELETE);
}
- public void readFields(DataInputStream in, int logVersion)
+ static ConcatDeleteOp getInstance() {
+ return (ConcatDeleteOp)opInstances.get()
+ .get(OP_CONCAT_DELETE);
+ }
+
+ ConcatDeleteOp setTarget(String trg) {
+ this.trg = trg;
+ return this;
+ }
+
+ ConcatDeleteOp setSources(String[] srcs) {
+ this.srcs = srcs;
+ return this;
+ }
+
+ ConcatDeleteOp setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ int size = 1 + srcs.length + 1; // trg, srcs, timestamp
+ DeprecatedUTF8 info[] = new DeprecatedUTF8[size];
+ int idx = 0;
+ info[idx++] = new DeprecatedUTF8(trg);
+ for(int i=0; i<srcs.length; i++) {
+ info[idx++] = new DeprecatedUTF8(srcs[i]);
+ }
+ info[idx] = toLogLong(timestamp);
+ new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.length = in.readInt();
if (length < 3) { // trg, srcs.., timestam
@@ -230,7 +417,37 @@ public abstract class FSEditLogOp {
super(OP_RENAME_OLD);
}
- public void readFields(DataInputStream in, int logVersion)
+ static RenameOldOp getInstance() {
+ return (RenameOldOp)opInstances.get()
+ .get(OP_RENAME_OLD);
+ }
+
+ RenameOldOp setSource(String src) {
+ this.src = src;
+ return this;
+ }
+
+ RenameOldOp setDestination(String dst) {
+ this.dst = dst;
+ return this;
+ }
+
+ RenameOldOp setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
+ new DeprecatedUTF8(src),
+ new DeprecatedUTF8(dst),
+ toLogLong(timestamp)};
+ new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.length = in.readInt();
if (this.length != 3) {
@@ -252,9 +469,32 @@ public abstract class FSEditLogOp {
super(OP_DELETE);
}
- public void readFields(DataInputStream in, int logVersion)
- throws IOException {
+ static DeleteOp getInstance() {
+ return (DeleteOp)opInstances.get()
+ .get(OP_DELETE);
+ }
+
+ DeleteOp setPath(String path) {
+ this.path = path;
+ return this;
+ }
+ DeleteOp setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
+ new DeprecatedUTF8(path),
+ toLogLong(timestamp)};
+ new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
+ throws IOException {
this.length = in.readInt();
if (this.length != 2) {
throw new IOException("Incorrect data format. "
@@ -274,8 +514,40 @@ public abstract class FSEditLogOp {
private MkdirOp() {
super(OP_MKDIR);
}
+
+ static MkdirOp getInstance() {
+ return (MkdirOp)opInstances.get()
+ .get(OP_MKDIR);
+ }
- public void readFields(DataInputStream in, int logVersion)
+ MkdirOp setPath(String path) {
+ this.path = path;
+ return this;
+ }
+
+ MkdirOp setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ MkdirOp setPermissionStatus(PermissionStatus permissions) {
+ this.permissions = permissions;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
+ new DeprecatedUTF8(path),
+ toLogLong(timestamp), // mtime
+ toLogLong(timestamp) // atime, unused at this time
+ };
+ new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+ permissions.write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.length = in.readInt();
@@ -309,32 +581,70 @@ public abstract class FSEditLogOp {
super(OP_SET_GENSTAMP);
}
- public void readFields(DataInputStream in, int logVersion)
+ static SetGenstampOp getInstance() {
+ return (SetGenstampOp)opInstances.get()
+ .get(OP_SET_GENSTAMP);
+ }
+
+ SetGenstampOp setGenerationStamp(long genStamp) {
+ this.genStamp = genStamp;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ new LongWritable(genStamp).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.genStamp = in.readLong();
}
}
+ @SuppressWarnings("deprecation")
static class DatanodeAddOp extends FSEditLogOp {
- @SuppressWarnings("deprecation")
private DatanodeAddOp() {
super(OP_DATANODE_ADD);
}
- public void readFields(DataInputStream in, int logVersion)
+ static DatanodeAddOp getInstance() {
+ return (DatanodeAddOp)opInstances.get()
+ .get(OP_DATANODE_ADD);
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ throw new IOException("Deprecated, should not write");
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
//Datanodes are not persistent any more.
FSImageSerialization.DatanodeImage.skipOne(in);
}
}
+ @SuppressWarnings("deprecation")
static class DatanodeRemoveOp extends FSEditLogOp {
- @SuppressWarnings("deprecation")
private DatanodeRemoveOp() {
super(OP_DATANODE_REMOVE);
}
- public void readFields(DataInputStream in, int logVersion)
+ static DatanodeRemoveOp getInstance() {
+ return (DatanodeRemoveOp)opInstances.get()
+ .get(OP_DATANODE_REMOVE);
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ throw new IOException("Deprecated, should not write");
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
DatanodeID nodeID = new DatanodeID();
nodeID.readFields(in);
@@ -350,7 +660,29 @@ public abstract class FSEditLogOp {
super(OP_SET_PERMISSIONS);
}
- public void readFields(DataInputStream in, int logVersion)
+ static SetPermissionsOp getInstance() {
+ return (SetPermissionsOp)opInstances.get()
+ .get(OP_SET_PERMISSIONS);
+ }
+
+ SetPermissionsOp setSource(String src) {
+ this.src = src;
+ return this;
+ }
+
+ SetPermissionsOp setPermissions(FsPermission permissions) {
+ this.permissions = permissions;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ new DeprecatedUTF8(src).write(out);
+ permissions.write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.src = FSImageSerialization.readString(in);
this.permissions = FsPermission.read(in);
@@ -366,13 +698,42 @@ public abstract class FSEditLogOp {
super(OP_SET_OWNER);
}
- public void readFields(DataInputStream in, int logVersion)
+ static SetOwnerOp getInstance() {
+ return (SetOwnerOp)opInstances.get()
+ .get(OP_SET_OWNER);
+ }
+
+ SetOwnerOp setSource(String src) {
+ this.src = src;
+ return this;
+ }
+
+ SetOwnerOp setUser(String username) {
+ this.username = username;
+ return this;
+ }
+
+ SetOwnerOp setGroup(String groupname) {
+ this.groupname = groupname;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ DeprecatedUTF8 u = new DeprecatedUTF8(username == null? "": username);
+ DeprecatedUTF8 g = new DeprecatedUTF8(groupname == null? "": groupname);
+ new DeprecatedUTF8(src).write(out);
+ u.write(out);
+ g.write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.src = FSImageSerialization.readString(in);
this.username = FSImageSerialization.readString_EmptyAsNull(in);
this.groupname = FSImageSerialization.readString_EmptyAsNull(in);
}
-
}
static class SetNSQuotaOp extends FSEditLogOp {
@@ -383,7 +744,18 @@ public abstract class FSEditLogOp {
super(OP_SET_NS_QUOTA);
}
- public void readFields(DataInputStream in, int logVersion)
+ static SetNSQuotaOp getInstance() {
+ return (SetNSQuotaOp)opInstances.get()
+ .get(OP_SET_NS_QUOTA);
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ throw new IOException("Deprecated");
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.src = FSImageSerialization.readString(in);
this.nsQuota = readLongWritable(in);
@@ -397,7 +769,18 @@ public abstract class FSEditLogOp {
super(OP_CLEAR_NS_QUOTA);
}
- public void readFields(DataInputStream in, int logVersion)
+ static ClearNSQuotaOp getInstance() {
+ return (ClearNSQuotaOp)opInstances.get()
+ .get(OP_CLEAR_NS_QUOTA);
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ throw new IOException("Deprecated");
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.src = FSImageSerialization.readString(in);
}
@@ -412,7 +795,35 @@ public abstract class FSEditLogOp {
super(OP_SET_QUOTA);
}
- public void readFields(DataInputStream in, int logVersion)
+ static SetQuotaOp getInstance() {
+ return (SetQuotaOp)opInstances.get()
+ .get(OP_SET_QUOTA);
+ }
+
+ SetQuotaOp setSource(String src) {
+ this.src = src;
+ return this;
+ }
+
+ SetQuotaOp setNSQuota(long nsQuota) {
+ this.nsQuota = nsQuota;
+ return this;
+ }
+
+ SetQuotaOp setDSQuota(long dsQuota) {
+ this.dsQuota = dsQuota;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ new DeprecatedUTF8(src).write(out);
+ new LongWritable(nsQuota).write(out);
+ new LongWritable(dsQuota).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.src = FSImageSerialization.readString(in);
this.nsQuota = readLongWritable(in);
@@ -430,7 +841,37 @@ public abstract class FSEditLogOp {
super(OP_TIMES);
}
- public void readFields(DataInputStream in, int logVersion)
+ static TimesOp getInstance() {
+ return (TimesOp)opInstances.get()
+ .get(OP_TIMES);
+ }
+
+ TimesOp setPath(String path) {
+ this.path = path;
+ return this;
+ }
+
+ TimesOp setModificationTime(long mtime) {
+ this.mtime = mtime;
+ return this;
+ }
+
+ TimesOp setAccessTime(long atime) {
+ this.atime = atime;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
+ new DeprecatedUTF8(path),
+ toLogLong(mtime),
+ toLogLong(atime)};
+ new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.length = in.readInt();
if (length != 3) {
@@ -455,7 +896,49 @@ public abstract class FSEditLogOp {
super(OP_SYMLINK);
}
- public void readFields(DataInputStream in, int logVersion)
+ static SymlinkOp getInstance() {
+ return (SymlinkOp)opInstances.get()
+ .get(OP_SYMLINK);
+ }
+
+ SymlinkOp setPath(String path) {
+ this.path = path;
+ return this;
+ }
+
+ SymlinkOp setValue(String value) {
+ this.value = value;
+ return this;
+ }
+
+ SymlinkOp setModificationTime(long mtime) {
+ this.mtime = mtime;
+ return this;
+ }
+
+ SymlinkOp setAccessTime(long atime) {
+ this.atime = atime;
+ return this;
+ }
+
+ SymlinkOp setPermissionStatus(PermissionStatus permissionStatus) {
+ this.permissionStatus = permissionStatus;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
+ new DeprecatedUTF8(path),
+ new DeprecatedUTF8(value),
+ toLogLong(mtime),
+ toLogLong(atime)};
+ new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+ permissionStatus.write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.length = in.readInt();
@@ -482,7 +965,43 @@ public abstract class FSEditLogOp {
super(OP_RENAME);
}
- public void readFields(DataInputStream in, int logVersion)
+ static RenameOp getInstance() {
+ return (RenameOp)opInstances.get()
+ .get(OP_RENAME);
+ }
+
+ RenameOp setSource(String src) {
+ this.src = src;
+ return this;
+ }
+
+ RenameOp setDestination(String dst) {
+ this.dst = dst;
+ return this;
+ }
+
+ RenameOp setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ RenameOp setOptions(Rename[] options) {
+ this.options = options;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ DeprecatedUTF8 info[] = new DeprecatedUTF8[] {
+ new DeprecatedUTF8(src),
+ new DeprecatedUTF8(dst),
+ toLogLong(timestamp)};
+ new ArrayWritable(DeprecatedUTF8.class, info).write(out);
+ toBytesWritable(options).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.length = in.readInt();
if (this.length != 3) {
@@ -507,6 +1026,14 @@ public abstract class FSEditLogOp {
}
return options;
}
+
+ static BytesWritable toBytesWritable(Rename... options) {
+ byte[] bytes = new byte[options.length];
+ for (int i = 0; i < options.length; i++) {
+ bytes[i] = options[i].value();
+ }
+ return new BytesWritable(bytes);
+ }
}
static class ReassignLeaseOp extends FSEditLogOp {
@@ -517,8 +1044,36 @@ public abstract class FSEditLogOp {
private ReassignLeaseOp() {
super(OP_REASSIGN_LEASE);
}
-
- public void readFields(DataInputStream in, int logVersion)
+
+ static ReassignLeaseOp getInstance() {
+ return (ReassignLeaseOp)opInstances.get()
+ .get(OP_REASSIGN_LEASE);
+ }
+
+ ReassignLeaseOp setLeaseHolder(String leaseHolder) {
+ this.leaseHolder = leaseHolder;
+ return this;
+ }
+
+ ReassignLeaseOp setPath(String path) {
+ this.path = path;
+ return this;
+ }
+
+ ReassignLeaseOp setNewHolder(String newHolder) {
+ this.newHolder = newHolder;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ new DeprecatedUTF8(leaseHolder).write(out);
+ new DeprecatedUTF8(path).write(out);
+ new DeprecatedUTF8(newHolder).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.leaseHolder = FSImageSerialization.readString(in);
this.path = FSImageSerialization.readString(in);
@@ -534,7 +1089,30 @@ public abstract class FSEditLogOp {
super(OP_GET_DELEGATION_TOKEN);
}
- public void readFields(DataInputStream in, int logVersion)
+ static GetDelegationTokenOp getInstance() {
+ return (GetDelegationTokenOp)opInstances.get()
+ .get(OP_GET_DELEGATION_TOKEN);
+ }
+
+ GetDelegationTokenOp setDelegationTokenIdentifier(
+ DelegationTokenIdentifier token) {
+ this.token = token;
+ return this;
+ }
+
+ GetDelegationTokenOp setExpiryTime(long expiryTime) {
+ this.expiryTime = expiryTime;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ token.write(out);
+ toLogLong(expiryTime).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.token = new DelegationTokenIdentifier();
this.token.readFields(in);
@@ -550,7 +1128,30 @@ public abstract class FSEditLogOp {
super(OP_RENEW_DELEGATION_TOKEN);
}
- public void readFields(DataInputStream in, int logVersion)
+ static RenewDelegationTokenOp getInstance() {
+ return (RenewDelegationTokenOp)opInstances.get()
+ .get(OP_RENEW_DELEGATION_TOKEN);
+ }
+
+ RenewDelegationTokenOp setDelegationTokenIdentifier(
+ DelegationTokenIdentifier token) {
+ this.token = token;
+ return this;
+ }
+
+ RenewDelegationTokenOp setExpiryTime(long expiryTime) {
+ this.expiryTime = expiryTime;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ token.write(out);
+ toLogLong(expiryTime).write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.token = new DelegationTokenIdentifier();
this.token.readFields(in);
@@ -565,7 +1166,24 @@ public abstract class FSEditLogOp {
super(OP_CANCEL_DELEGATION_TOKEN);
}
- public void readFields(DataInputStream in, int logVersion)
+ static CancelDelegationTokenOp getInstance() {
+ return (CancelDelegationTokenOp)opInstances.get()
+ .get(OP_CANCEL_DELEGATION_TOKEN);
+ }
+
+ CancelDelegationTokenOp setDelegationTokenIdentifier(
+ DelegationTokenIdentifier token) {
+ this.token = token;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ token.write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.token = new DelegationTokenIdentifier();
this.token.readFields(in);
@@ -579,7 +1197,23 @@ public abstract class FSEditLogOp {
super(OP_UPDATE_MASTER_KEY);
}
- public void readFields(DataInputStream in, int logVersion)
+ static UpdateMasterKeyOp getInstance() {
+ return (UpdateMasterKeyOp)opInstances.get()
+ .get(OP_UPDATE_MASTER_KEY);
+ }
+
+ UpdateMasterKeyOp setDelegationKey(DelegationKey key) {
+ this.key = key;
+ return this;
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ key.write(out);
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
throws IOException {
this.key = new DelegationKey();
this.key.readFields(in);
@@ -593,10 +1227,39 @@ public abstract class FSEditLogOp {
code == OP_END_LOG_SEGMENT : "Bad op: " + code;
}
+ static LogSegmentOp getInstance(FSEditLogOpCodes code) {
+ return (LogSegmentOp)opInstances.get().get(code);
+ }
+
public void readFields(DataInputStream in, int logVersion)
throws IOException {
// no data stored in these ops yet
}
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ // no data stored
+ }
+ }
+
+ static class InvalidOp extends FSEditLogOp {
+ private InvalidOp() {
+ super(OP_INVALID);
+ }
+
+ static InvalidOp getInstance() {
+ return (InvalidOp)opInstances.get().get(OP_INVALID);
+ }
+
+ @Override
+ void writeFields(DataOutputStream out) throws IOException {
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion)
+ throws IOException {
+ // nothing to read
+ }
}
static private short readShort(DataInputStream in) throws IOException {
@@ -607,6 +1270,14 @@ public abstract class FSEditLogOp {
return Long.parseLong(FSImageSerialization.readString(in));
}
+ static private DeprecatedUTF8 toLogReplication(short replication) {
+ return new DeprecatedUTF8(Short.toString(replication));
+ }
+
+ static private DeprecatedUTF8 toLogLong(long timestamp) {
+ return new DeprecatedUTF8(Long.toString(timestamp));
+ }
+
/**
* A class to read in blocks stored in the old format. The only two
* fields in the block were blockid and length.
@@ -710,13 +1381,43 @@ public abstract class FSEditLogOp {
}
/**
+ * Class for writing editlog ops
+ */
+ public static class Writer {
+ private final DataOutputBuffer buf;
+
+ public Writer(DataOutputBuffer out) {
+ this.buf = out;
+ }
+
+ /**
+ * Write an operation to the output stream
+ *
+ * @param op The operation to write
+ * @throws IOException if an error occurs during writing.
+ */
+ public void writeOp(FSEditLogOp op) throws IOException {
+ int start = buf.getLength();
+ buf.writeByte(op.opCode.getOpCode());
+ buf.writeLong(op.txid);
+ op.writeFields(buf);
+ int end = buf.getLength();
+ Checksum checksum = FSEditLog.getChecksum();
+ checksum.reset();
+ checksum.update(buf.getData(), start, end-start);
+ int sum = (int)checksum.getValue();
+ buf.writeInt(sum);
+ }
+ }
+
+ /**
* Class for reading editlog ops from a stream
*/
public static class Reader {
private final DataInputStream in;
private final int logVersion;
private final Checksum checksum;
- private EnumMap<FSEditLogOpCodes, FSEditLogOp> opInstances;
+
/**
* Construct the reader
* @param in The stream to read from.
@@ -734,36 +1435,6 @@ public abstract class FSEditLogOp {
}
this.logVersion = logVersion;
this.checksum = checksum;
- opInstances = new EnumMap<FSEditLogOpCodes, FSEditLogOp>(
- FSEditLogOpCodes.class);
- opInstances.put(OP_ADD, new AddCloseOp(OP_ADD));
- opInstances.put(OP_CLOSE, new AddCloseOp(OP_CLOSE));
- opInstances.put(OP_SET_REPLICATION, new SetReplicationOp());
- opInstances.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
- opInstances.put(OP_RENAME_OLD, new RenameOldOp());
- opInstances.put(OP_DELETE, new DeleteOp());
- opInstances.put(OP_MKDIR, new MkdirOp());
- opInstances.put(OP_SET_GENSTAMP, new SetGenstampOp());
- opInstances.put(OP_DATANODE_ADD, new DatanodeAddOp());
- opInstances.put(OP_DATANODE_REMOVE, new DatanodeRemoveOp());
- opInstances.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
- opInstances.put(OP_SET_OWNER, new SetOwnerOp());
- opInstances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
- opInstances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
- opInstances.put(OP_SET_QUOTA, new SetQuotaOp());
- opInstances.put(OP_TIMES, new TimesOp());
- opInstances.put(OP_SYMLINK, new SymlinkOp());
- opInstances.put(OP_RENAME, new RenameOp());
- opInstances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
- opInstances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
- opInstances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
- opInstances.put(OP_CANCEL_DELEGATION_TOKEN,
- new CancelDelegationTokenOp());
- opInstances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
- opInstances.put(OP_START_LOG_SEGMENT,
- new LogSegmentOp(OP_START_LOG_SEGMENT));
- opInstances.put(OP_END_LOG_SEGMENT,
- new LogSegmentOp(OP_END_LOG_SEGMENT));
}
/**
@@ -796,7 +1467,7 @@ public abstract class FSEditLogOp {
return null;
}
- FSEditLogOp op = opInstances.get(opCode);
+ FSEditLogOp op = opInstances.get().get(opCode);
if (op == null) {
throw new IOException("Read invalid opcode " + opCode);
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1152128&r1=1152127&r2=1152128&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Jul 29 07:10:48 2011
@@ -295,7 +295,7 @@ public class FSImage implements Closeabl
StorageDirectory sd = it.next();
StorageState curState;
try {
- curState = sd.analyzeStorage(startOpt);
+ curState = sd.analyzeStorage(startOpt, storage);
// sd is locked but not opened
switch(curState) {
case NON_EXISTENT:
@@ -311,7 +311,8 @@ public class FSImage implements Closeabl
}
if (curState != StorageState.NOT_FORMATTED
&& startOpt != StartupOption.ROLLBACK) {
- sd.read(); // read and verify consistency with other directories
+ // read and verify consistency with other directories
+ storage.readProperties(sd);
isFormatted = true;
}
if (startOpt == StartupOption.IMPORT && isFormatted)
@@ -395,7 +396,7 @@ public class FSImage implements Closeabl
try {
// Write the version file, since saveFsImage above only makes the
// fsimage_<txid>, and the directory is otherwise empty.
- sd.write();
+ storage.writeProperties(sd);
File prevDir = sd.getPreviousDir();
File tmpDir = sd.getPreviousTmp();
@@ -433,14 +434,14 @@ public class FSImage implements Closeabl
if (!prevDir.exists()) { // use current directory then
LOG.info("Storage directory " + sd.getRoot()
+ " does not contain previous fs state.");
- sd.read(); // read and verify consistency with other directories
+ // read and verify consistency with other directories
+ storage.readProperties(sd);
continue;
}
- StorageDirectory sdPrev
- = prevState.getStorage().new StorageDirectory(sd.getRoot());
// read and verify consistency of the prev dir
- sdPrev.read(sdPrev.getPreviousVersionFile());
+ prevState.getStorage().readPreviousVersionProperties(sd);
+
if (prevState.getLayoutVersion() != FSConstants.LAYOUT_VERSION) {
throw new IOException(
"Cannot rollback to storage version " +
@@ -604,7 +605,7 @@ public class FSImage implements Closeabl
//
StorageDirectory sdForProperties =
loadPlan.getStorageDirectoryForProperties();
- sdForProperties.read();
+ storage.readProperties(sdForProperties);
File imageFile = loadPlan.getImageFile();
try {
@@ -730,7 +731,6 @@ public class FSImage implements Closeabl
storage.setMostRecentCheckpointTxId(txId);
}
-
/**
* Save the contents of the FS image to the file.
*/