You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2008/08/16 02:06:06 UTC
svn commit: r686420 - in /hadoop/core/trunk: ./
src/core/org/apache/hadoop/io/
src/hdfs/org/apache/hadoop/hdfs/server/datanode/
src/hdfs/org/apache/hadoop/hdfs/server/namenode/
src/test/org/apache/hadoop/hdfs/server/namenode/
Author: shv
Date: Fri Aug 15 17:06:05 2008
New Revision: 686420
URL: http://svn.apache.org/viewvc?rev=686420&view=rev
Log:
HADOOP-3905. Create generic interfaces for edit log streams. Contributed by Konstantin Shvachko.
Added:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EdlitLogInputStream.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/io/DataOutputBuffer.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=686420&r1=686419&r2=686420&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Aug 15 17:06:05 2008
@@ -197,6 +197,8 @@
HADOOP-3935. Split out inner classes from DataNode.java. (johan)
+ HADOOP-3905. Create generic interfaces for edit log streams. (shv)
+
OPTIMIZATIONS
HADOOP-3556. Removed lock contention in MD5Hash by changing the
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/io/DataOutputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/DataOutputBuffer.java?rev=686420&r1=686419&r2=686420&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/io/DataOutputBuffer.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/io/DataOutputBuffer.java Fri Aug 15 17:06:05 2008
@@ -44,7 +44,6 @@
private static class Buffer extends ByteArrayOutputStream {
public byte[] getData() { return buf; }
public int getLength() { return count; }
- public void reset() { count = 0; }
public Buffer() {
super();
@@ -101,4 +100,9 @@
public void write(DataInput in, int length) throws IOException {
buffer.write(in, length);
}
+
+ /** Write to a file stream */
+ public void writeTo(OutputStream out) throws IOException {
+ buffer.writeTo(out);
+ }
}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=686420&r1=686419&r2=686420&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Aug 15 17:06:05 2008
@@ -484,7 +484,7 @@
* 2) to receive a registrationID
* issued by the namenode to recognize registered datanodes.
*
- * @see FSNamesystem#registerDatanode(DatanodeRegistration,String)
+ * @see FSNamesystem#registerDatanode(DatanodeRegistration)
* @throws IOException
*/
private void register() throws IOException {
Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=686420&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Fri Aug 15 17:06:05 2008
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A generic abstract class to support journaling of edits logs into
+ * a persistent storage.
+ */
+abstract class EditLogOutputStream extends OutputStream {
+ // these are statistics counters
+ private long numSync; // number of sync(s) to disk
+ private long totalTimeSync; // total time to sync
+
+ EditLogOutputStream() throws IOException {
+ numSync = totalTimeSync = 0;
+ }
+
+ /**
+ * Get this stream name.
+ *
+ * @return name of the stream
+ */
+ abstract String getName();
+
+ /** {@inheritDoc} */
+ abstract public void write(int b) throws IOException;
+
+ /**
+ * Write edits log record into the stream.
+ * The record is represented by operation name and
+ * an array of Writable arguments.
+ *
+ * @param op operation
+ * @param writables array of Writable arguments
+ * @throws IOException
+ */
+ abstract void write(byte op, Writable ... writables) throws IOException;
+
+ /**
+ * Create and initialize new edits log storage.
+ *
+ * @throws IOException
+ */
+ abstract void create() throws IOException;
+
+ /** {@inheritDoc} */
+ abstract public void close() throws IOException;
+
+ /**
+ * All data that has been written to the stream so far will be flushed.
+ * New data can be still written to the stream while flushing is performed.
+ */
+ abstract void setReadyToFlush() throws IOException;
+
+ /**
+ * Flush and sync all data that is ready to be flush
+ * {@link #setReadyToFlush()} into underlying persistent store.
+ * @throws IOException
+ */
+ abstract protected void flushAndSync() throws IOException;
+
+ /**
+ * Flush data to persistent store.
+ * Collect sync metrics.
+ */
+ public void flush() throws IOException {
+ numSync++;
+ long start = FSNamesystem.now();
+ flushAndSync();
+ long end = FSNamesystem.now();
+ totalTimeSync += (end - start);
+ }
+
+ /**
+ * Return the size of the current edits log.
+ */
+ abstract long length() throws IOException;
+
+ /**
+ * Returns the time the edits log stream was last modified.
+ */
+ abstract long lastModified();
+
+ /**
+ * Return total time spent in {@link #flushAndSync()}
+ */
+ long getTotalSyncTime() {
+ return totalTimeSync;
+ }
+
+ /**
+ * Return number of calls to {@link #flushAndSync()}
+ */
+ long getNumSync() {
+ return numSync;
+ }
+}
Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EdlitLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EdlitLogInputStream.java?rev=686420&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EdlitLogInputStream.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/EdlitLogInputStream.java Fri Aug 15 17:06:05 2008
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A generic abstract class to support reading edits log data from
+ * persistent storage.
+ *
+ * It should stream bytes from the storage exactly as they were written
+ * into the #{@link EditLogOutputStream}.
+ */
+abstract class EditLogInputStream extends InputStream {
+ /**
+ * Get this stream name.
+ *
+ * @return name of the stream
+ */
+ abstract String getName();
+
+ /** {@inheritDoc} */
+ public abstract int available() throws IOException;
+
+ /** {@inheritDoc} */
+ public abstract int read() throws IOException;
+
+ /** {@inheritDoc} */
+ public abstract void close() throws IOException;
+
+ /**
+ * Return the size of the current edits log.
+ */
+ abstract long length() throws IOException;
+}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=686420&r1=686419&r2=686420&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Aug 15 17:06:05 2008
@@ -21,8 +21,6 @@
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
@@ -97,123 +95,148 @@
}
};
- static class EditLogOutputStream {
- private FileChannel fc;
- private FileOutputStream fp;
- private DataOutputStream od;
- private DataOutputStream od1;
- private DataOutputStream od2;
- private ByteArrayOutputStream buf1;
- private ByteArrayOutputStream buf2;
- private int bufSize;
-
- // these are statistics counters
- private long numSync; // number of syncs to disk
- private long totalTimeSync; // total time to sync
-
- EditLogOutputStream(File name) throws IOException {
- bufSize = sizeFlushBuffer;
- buf1 = new ByteArrayOutputStream(bufSize);
- buf2 = new ByteArrayOutputStream(bufSize);
- od1 = new DataOutputStream(buf1);
- od2 = new DataOutputStream(buf2);
- od = od1; // start with first buffer
+ /**
+ * An implementation of the abstract class {@link EditLogOutputStream},
+ * which stores edits in a local file.
+ */
+ static private class EditLogFileOutputStream extends EditLogOutputStream {
+ private File file;
+ private FileOutputStream fp; // file stream for storing edit logs
+ private FileChannel fc; // channel of the file stream for sync
+ private DataOutputBuffer bufCurrent; // current buffer for writing
+ private DataOutputBuffer bufReady; // buffer ready for flushing
+
+ EditLogFileOutputStream(File name) throws IOException {
+ super();
+ file = name;
+ bufCurrent = new DataOutputBuffer(sizeFlushBuffer);
+ bufReady = new DataOutputBuffer(sizeFlushBuffer);
fp = new FileOutputStream(name, true); // open for append
fc = fp.getChannel();
- numSync = totalTimeSync = 0;
}
- // returns the current output stream
- DataOutputStream getOutputStream() {
- return od;
- }
-
- void flushAndSync() throws IOException {
- this.flush();
- fc.force(true);
+ @Override
+ String getName() {
+ return file.getPath();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(int b) throws IOException {
+ bufCurrent.write(b);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ void write(byte op, Writable ... writables) throws IOException {
+ write(op);
+ for(Writable w : writables) {
+ w.write(bufCurrent);
+ }
}
+ /**
+ * Create empty edits logs file.
+ */
+ @Override
void create() throws IOException {
fc.truncate(0);
- od.writeInt(FSConstants.LAYOUT_VERSION);
- flushAndSync();
+ bufCurrent.writeInt(FSConstants.LAYOUT_VERSION);
+ setReadyToFlush();
+ flush();
}
- // flush current buffer
- private void flush() throws IOException {
- ByteArrayOutputStream buf = getBuffer();
- if (buf.size() == 0) {
- return; // no data to flush
- }
- buf.writeTo(fp); // write data to file
- buf.reset(); // erase all data in buf
- }
-
- void close() throws IOException {
+ @Override
+ public void close() throws IOException {
// close should have been called after all pending transactions
// have been flushed & synced.
- if (getBufSize() != 0) {
- throw new IOException("FSEditStream has " + getBufSize() +
+ int bufSize = bufCurrent.size();
+ if (bufSize != 0) {
+ throw new IOException("FSEditStream has " + bufSize +
" bytes still to be flushed and cannot " +
- "closed.");
+ "be closed.");
}
- od.close();
+ bufCurrent.close();
+ bufReady.close();
fp.close();
- buf1 = buf2 = null;
- od = od1 = od2 = null;
+ bufCurrent = bufReady = null;
}
- // returns the amount of data in the buffer
- int getBufSize() {
- return getBuffer().size();
+ /**
+ * All data that has been written to the stream so far will be flushed.
+ * New data can be still written to the stream while flushing is performed.
+ */
+ @Override
+ void setReadyToFlush() {
+ assert bufReady.size() == 0 : "previous data is not flushed yet";
+ DataOutputBuffer tmp = bufReady;
+ bufReady = bufCurrent;
+ bufCurrent = tmp;
}
- // get the current buffer
- private ByteArrayOutputStream getBuffer() {
- if (od == od1) {
- return buf1;
- } else {
- return buf2;
- }
+ /**
+ * Flush ready buffer to persistent store.
+ * currentBuffer is not flushed as it accumulates new log records
+ * while readyBuffer will be flushed and synced.
+ */
+ @Override
+ protected void flushAndSync() throws IOException {
+ bufReady.writeTo(fp); // write data to file
+ bufReady.reset(); // erase all data in the buffer
+ fc.force(true); // sync to persistent store
}
- //
- // Flush current buffer to output stream, swap buffers
- // This is protected by the flushLock.
- //
- void swap() {
- if (od == od1) {
- od = od2;
- } else {
- od = od1;
- }
+ /**
+ * Return the size of the current edit log including buffered data.
+ */
+ @Override
+ long length() throws IOException {
+ // file size + size of both buffers
+ return fc.size() + bufReady.size() + bufCurrent.size();
+ }
+
+ /**
+ * Returns the time the edits log file was last modified.
+ */
+ @Override
+ long lastModified() {
+ return file.lastModified();
}
+ }
- //
- // Flush old buffer to persistent store
- //
- void flushAndSyncOld() throws IOException {
- numSync++;
- ByteArrayOutputStream oldbuf;
- if (od == od1) {
- oldbuf = buf2;
- } else {
- oldbuf = buf1;
- }
- long start = FSNamesystem.now();
- oldbuf.writeTo(fp); // write data to file
- oldbuf.reset(); // erase all data in buf
- fc.force(true); // sync to persistent store
- long end = FSNamesystem.now();
- totalTimeSync += (end - start);
+ static class EditLogFileInputStream extends EditLogInputStream {
+ private File file;
+ private FileInputStream fStream;
+
+ EditLogFileInputStream(File name) throws IOException {
+ file = name;
+ fStream = new FileInputStream(name);
+ }
+
+ @Override
+ String getName() {
+ return file.getPath();
}
- long getTotalSyncTime() {
- return totalTimeSync;
+ @Override
+ public int available() throws IOException {
+ return fStream.available();
}
- long getNumSync() {
- return numSync;
+ @Override
+ public int read() throws IOException {
+ return fStream.read();
+ }
+
+ @Override
+ public void close() throws IOException {
+ fStream.close();
+ }
+
+ @Override
+ long length() throws IOException {
+ // file size + size of both buffers
+ return file.length();
}
}
@@ -258,24 +281,24 @@
for (int idx = 0; idx < size; idx++) {
File eFile = getEditFile(idx);
try {
- EditLogOutputStream eStream = new EditLogOutputStream(eFile);
+ EditLogOutputStream eStream = new EditLogFileOutputStream(eFile);
editStreams.add(eStream);
} catch (IOException e) {
FSNamesystem.LOG.warn("Unable to open edit log file " + eFile);
fsimage.processIOError(idx);
- idx--;
+ idx--;
}
}
}
public synchronized void createEditLogFile(File name) throws IOException {
- EditLogOutputStream eStream = new EditLogOutputStream(name);
+ EditLogOutputStream eStream = new EditLogFileOutputStream(name);
eStream.create();
eStream.close();
}
/**
- * Create edits.new if non existant.
+ * Create edits.new if non existent.
*/
synchronized void createNewIfMissing() throws IOException {
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
@@ -286,7 +309,7 @@
}
/**
- * Shutdown the filestore
+ * Shutdown the file store.
*/
public synchronized void close() throws IOException {
while (isSyncRunning) {
@@ -304,7 +327,8 @@
for (int idx = 0; idx < editStreams.size(); idx++) {
EditLogOutputStream eStream = editStreams.get(idx);
try {
- eStream.flushAndSync();
+ eStream.setReadyToFlush();
+ eStream.flush();
eStream.close();
} catch (IOException e) {
processIOError(idx);
@@ -384,7 +408,7 @@
* This is where we apply edits that we've been writing to disk all
* along.
*/
- int loadFSEdits(File edits) throws IOException {
+ static int loadFSEdits(EditLogInputStream edits) throws IOException {
FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
FSDirectory fsDir = fsNamesys.dir;
int numEdits = 0;
@@ -399,275 +423,272 @@
numOpOther = 0;
long startTime = FSNamesystem.now();
- if (edits != null) {
- DataInputStream in = new DataInputStream(new BufferedInputStream(
- new FileInputStream(edits)));
+ DataInputStream in = new DataInputStream(new BufferedInputStream(edits));
+ try {
+ // Read log file version. Could be missing.
+ in.mark(4);
+ // If edits log is greater than 2G, available method will return negative
+ // numbers, so we avoid having to call available
+ boolean available = true;
try {
- // Read log file version. Could be missing.
- in.mark(4);
- // If edits log is greater than 2G, available method will return negative
- // numbers, so we avoid having to call available
- boolean available = true;
+ logVersion = in.readByte();
+ } catch (EOFException e) {
+ available = false;
+ }
+ if (available) {
+ in.reset();
+ logVersion = in.readInt();
+ if (logVersion < FSConstants.LAYOUT_VERSION) // future version
+ throw new IOException(
+ "Unexpected version of the file system log file: "
+ + logVersion + ". Current version = "
+ + FSConstants.LAYOUT_VERSION + ".");
+ }
+ assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
+ "Unsupported version " + logVersion;
+
+ while (true) {
+ long timestamp = 0;
+ long mtime = 0;
+ long blockSize = 0;
+ byte opcode = -1;
try {
- logVersion = in.readByte();
+ opcode = in.readByte();
} catch (EOFException e) {
- available = false;
+ break; // no more transactions
}
- if (available) {
- in.reset();
- logVersion = in.readInt();
- if (logVersion < FSConstants.LAYOUT_VERSION) // future version
- throw new IOException(
- "Unexpected version of the file system log file: "
- + logVersion + ". Current version = "
- + FSConstants.LAYOUT_VERSION + ".");
- }
- assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
- "Unsupported version " + logVersion;
-
- while (true) {
- long timestamp = 0;
- long mtime = 0;
- long blockSize = 0;
- byte opcode = -1;
- try {
- opcode = in.readByte();
- } catch (EOFException e) {
- break; // no more transactions
+ numEdits++;
+ switch (opcode) {
+ case OP_ADD:
+ case OP_CLOSE: {
+ // versions > 0 support per file replication
+ // get name and replication
+ int length = in.readInt();
+ if (-7 == logVersion && length != 3||
+ logVersion < -7 && length != 4) {
+ throw new IOException("Incorrect data format." +
+ " logVersion is " + logVersion +
+ " but writables.length is " +
+ length + ". ");
}
- numEdits++;
- switch (opcode) {
- case OP_ADD:
- case OP_CLOSE: {
- // versions > 0 support per file replication
- // get name and replication
- int length = in.readInt();
- if (-7 == logVersion && length != 3||
- logVersion < -7 && length != 4) {
- throw new IOException("Incorrect data format." +
- " logVersion is " + logVersion +
- " but writables.length is " +
- length + ". ");
- }
- path = FSImage.readString(in);
- short replication = adjustReplication(readShort(in));
- mtime = readLong(in);
- if (logVersion < -7) {
- blockSize = readLong(in);
- }
- // get blocks
- Block blocks[] = null;
- if (logVersion <= -14) {
- blocks = readBlocks(in);
- } else {
- BlockTwo oldblk = new BlockTwo();
- int num = in.readInt();
- blocks = new Block[num];
- for (int i = 0; i < num; i++) {
- oldblk.readFields(in);
- blocks[i] = new Block(oldblk.blkid, oldblk.len,
- Block.GRANDFATHER_GENERATION_STAMP);
- }
- }
-
- // Older versions of HDFS does not store the block size in inode.
- // If the file has more than one block, use the size of the
- // first block as the blocksize. Otherwise use the default
- // block size.
- if (-8 <= logVersion && blockSize == 0) {
- if (blocks.length > 1) {
- blockSize = blocks[0].getNumBytes();
- } else {
- long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 0);
- blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
- }
- }
-
- PermissionStatus permissions = fsNamesys.getUpgradePermission();
- if (logVersion <= -11) {
- permissions = PermissionStatus.read(in);
+ path = FSImage.readString(in);
+ short replication = adjustReplication(readShort(in));
+ mtime = readLong(in);
+ if (logVersion < -7) {
+ blockSize = readLong(in);
+ }
+ // get blocks
+ Block blocks[] = null;
+ if (logVersion <= -14) {
+ blocks = readBlocks(in);
+ } else {
+ BlockTwo oldblk = new BlockTwo();
+ int num = in.readInt();
+ blocks = new Block[num];
+ for (int i = 0; i < num; i++) {
+ oldblk.readFields(in);
+ blocks[i] = new Block(oldblk.blkid, oldblk.len,
+ Block.GRANDFATHER_GENERATION_STAMP);
}
+ }
- // clientname, clientMachine and block locations of last block.
- if (opcode == OP_ADD && logVersion <= -12) {
- clientName = FSImage.readString(in);
- clientMachine = FSImage.readString(in);
- if (-13 <= logVersion) {
- readDatanodeDescriptorArray(in);
- }
+ // Older versions of HDFS does not store the block size in inode.
+ // If the file has more than one block, use the size of the
+ // first block as the blocksize. Otherwise use the default
+ // block size.
+ if (-8 <= logVersion && blockSize == 0) {
+ if (blocks.length > 1) {
+ blockSize = blocks[0].getNumBytes();
} else {
- clientName = "";
- clientMachine = "";
- }
-
- // The open lease transaction re-creates a file if necessary.
- // Delete the file if it already exists.
- if (FSNamesystem.LOG.isDebugEnabled()) {
- FSNamesystem.LOG.debug(opcode + ": " + path +
- " numblocks : " + blocks.length +
- " clientHolder " + clientName +
- " clientMachine " + clientMachine);
+ long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 0);
+ blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
}
+ }
+
+ PermissionStatus permissions = fsNamesys.getUpgradePermission();
+ if (logVersion <= -11) {
+ permissions = PermissionStatus.read(in);
+ }
- old = fsDir.unprotectedDelete(path, mtime);
-
- // add to the file tree
- INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
- path, permissions,
- blocks, replication,
- mtime, blockSize);
- if (opcode == OP_ADD) {
- numOpAdd++;
- //
- // Replace current node with a INodeUnderConstruction.
- // Recreate in-memory lease record.
- //
- INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
- node.getLocalNameBytes(),
- node.getReplication(),
- node.getModificationTime(),
- node.getPreferredBlockSize(),
- node.getBlocks(),
- node.getPermissionStatus(),
- clientName,
- clientMachine,
- null);
- fsDir.replaceNode(path, node, cons);
- fsNamesys.leaseManager.addLease(cons.clientName, path);
- } else if (opcode == OP_CLOSE) {
- //
- // Remove lease if it exists.
- //
- if (old.isUnderConstruction()) {
- INodeFileUnderConstruction cons = (INodeFileUnderConstruction)
- old;
- fsNamesys.leaseManager.removeLease(cons.clientName, path);
- }
+ // clientname, clientMachine and block locations of last block.
+ if (opcode == OP_ADD && logVersion <= -12) {
+ clientName = FSImage.readString(in);
+ clientMachine = FSImage.readString(in);
+ if (-13 <= logVersion) {
+ readDatanodeDescriptorArray(in);
}
- break;
- }
- case OP_SET_REPLICATION: {
- numOpSetRepl++;
- path = FSImage.readString(in);
- short replication = adjustReplication(readShort(in));
- fsDir.unprotectedSetReplication(path, replication, null);
- break;
- }
- case OP_RENAME: {
- numOpRename++;
- int length = in.readInt();
- if (length != 3) {
- throw new IOException("Incorrect data format. "
- + "Mkdir operation.");
- }
- String s = FSImage.readString(in);
- String d = FSImage.readString(in);
- timestamp = readLong(in);
- DFSFileInfo dinfo = fsDir.getFileInfo(d);
- fsDir.unprotectedRenameTo(s, d, timestamp);
- fsNamesys.changeLease(s, d, dinfo);
- break;
+ } else {
+ clientName = "";
+ clientMachine = "";
}
- case OP_DELETE: {
- numOpDelete++;
- int length = in.readInt();
- if (length != 2) {
- throw new IOException("Incorrect data format. "
- + "delete operation.");
- }
- path = FSImage.readString(in);
- timestamp = readLong(in);
- old = fsDir.unprotectedDelete(path, timestamp);
- if (old != null && old.isUnderConstruction()) {
- INodeFileUnderConstruction cons = (INodeFileUnderConstruction)old;
- fsNamesys.leaseManager.removeLease(cons.clientName, path);
- }
- break;
+
+ // The open lease transaction re-creates a file if necessary.
+ // Delete the file if it already exists.
+ if (FSNamesystem.LOG.isDebugEnabled()) {
+ FSNamesystem.LOG.debug(opcode + ": " + path +
+ " numblocks : " + blocks.length +
+ " clientHolder " + clientName +
+ " clientMachine " + clientMachine);
}
- case OP_MKDIR: {
- numOpMkDir++;
- PermissionStatus permissions = fsNamesys.getUpgradePermission();
- int length = in.readInt();
- if (length != 2) {
- throw new IOException("Incorrect data format. "
- + "Mkdir operation.");
- }
- path = FSImage.readString(in);
- timestamp = readLong(in);
- if (logVersion <= -11) {
- permissions = PermissionStatus.read(in);
+ old = fsDir.unprotectedDelete(path, mtime);
+
+ // add to the file tree
+ INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
+ path, permissions,
+ blocks, replication,
+ mtime, blockSize);
+ if (opcode == OP_ADD) {
+ numOpAdd++;
+ //
+ // Replace current node with a INodeUnderConstruction.
+ // Recreate in-memory lease record.
+ //
+ INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
+ node.getLocalNameBytes(),
+ node.getReplication(),
+ node.getModificationTime(),
+ node.getPreferredBlockSize(),
+ node.getBlocks(),
+ node.getPermissionStatus(),
+ clientName,
+ clientMachine,
+ null);
+ fsDir.replaceNode(path, node, cons);
+ fsNamesys.leaseManager.addLease(cons.clientName, path);
+ } else if (opcode == OP_CLOSE) {
+ //
+ // Remove lease if it exists.
+ //
+ if (old.isUnderConstruction()) {
+ INodeFileUnderConstruction cons = (INodeFileUnderConstruction)
+ old;
+ fsNamesys.leaseManager.removeLease(cons.clientName, path);
}
- fsDir.unprotectedMkdir(path, permissions, timestamp);
- break;
- }
- case OP_SET_GENSTAMP: {
- numOpSetGenStamp++;
- long lw = in.readLong();
- fsDir.namesystem.setGenerationStamp(lw);
- break;
- }
- case OP_DATANODE_ADD: {
- numOpOther++;
- FSImage.DatanodeImage nodeimage = new FSImage.DatanodeImage();
- nodeimage.readFields(in);
- //Datnodes are not persistent any more.
- break;
}
- case OP_DATANODE_REMOVE: {
- numOpOther++;
- DatanodeID nodeID = new DatanodeID();
- nodeID.readFields(in);
- //Datanodes are not persistent any more.
- break;
+ break;
+ }
+ case OP_SET_REPLICATION: {
+ numOpSetRepl++;
+ path = FSImage.readString(in);
+ short replication = adjustReplication(readShort(in));
+ fsDir.unprotectedSetReplication(path, replication, null);
+ break;
+ }
+ case OP_RENAME: {
+ numOpRename++;
+ int length = in.readInt();
+ if (length != 3) {
+ throw new IOException("Incorrect data format. "
+ + "Mkdir operation.");
}
- case OP_SET_PERMISSIONS: {
- numOpSetPerm++;
- if (logVersion > -11)
- throw new IOException("Unexpected opcode " + opcode
- + " for version " + logVersion);
- fsDir.unprotectedSetPermission(
- FSImage.readString(in), FsPermission.read(in));
- break;
+ String s = FSImage.readString(in);
+ String d = FSImage.readString(in);
+ timestamp = readLong(in);
+ DFSFileInfo dinfo = fsDir.getFileInfo(d);
+ fsDir.unprotectedRenameTo(s, d, timestamp);
+ fsNamesys.changeLease(s, d, dinfo);
+ break;
+ }
+ case OP_DELETE: {
+ numOpDelete++;
+ int length = in.readInt();
+ if (length != 2) {
+ throw new IOException("Incorrect data format. "
+ + "delete operation.");
}
- case OP_SET_OWNER: {
- numOpSetOwner++;
- if (logVersion > -11)
- throw new IOException("Unexpected opcode " + opcode
- + " for version " + logVersion);
- fsDir.unprotectedSetOwner(FSImage.readString(in),
- FSImage.readString(in), FSImage.readString(in));
- break;
+ path = FSImage.readString(in);
+ timestamp = readLong(in);
+ old = fsDir.unprotectedDelete(path, timestamp);
+ if (old != null && old.isUnderConstruction()) {
+ INodeFileUnderConstruction cons = (INodeFileUnderConstruction)old;
+ fsNamesys.leaseManager.removeLease(cons.clientName, path);
}
- case OP_SET_QUOTA: {
- if (logVersion > -16) {
- throw new IOException("Unexpected opcode " + opcode
- + " for version " + logVersion);
- }
- fsDir.unprotectedSetQuota(FSImage.readString(in),
- readLongWritable(in) );
- break;
+ break;
+ }
+ case OP_MKDIR: {
+ numOpMkDir++;
+ PermissionStatus permissions = fsNamesys.getUpgradePermission();
+ int length = in.readInt();
+ if (length != 2) {
+ throw new IOException("Incorrect data format. "
+ + "Mkdir operation.");
}
- case OP_CLEAR_QUOTA: {
- if (logVersion > -16) {
- throw new IOException("Unexpected opcode " + opcode
- + " for version " + logVersion);
- }
- fsDir.unprotectedClearQuota(FSImage.readString(in));
- break;
+ path = FSImage.readString(in);
+ timestamp = readLong(in);
+
+ if (logVersion <= -11) {
+ permissions = PermissionStatus.read(in);
}
- default: {
- throw new IOException("Never seen opcode " + opcode);
+ fsDir.unprotectedMkdir(path, permissions, timestamp);
+ break;
+ }
+ case OP_SET_GENSTAMP: {
+ numOpSetGenStamp++;
+ long lw = in.readLong();
+ fsDir.namesystem.setGenerationStamp(lw);
+ break;
+ }
+ case OP_DATANODE_ADD: {
+ numOpOther++;
+ FSImage.DatanodeImage nodeimage = new FSImage.DatanodeImage();
+ nodeimage.readFields(in);
+ //Datnodes are not persistent any more.
+ break;
+ }
+ case OP_DATANODE_REMOVE: {
+ numOpOther++;
+ DatanodeID nodeID = new DatanodeID();
+ nodeID.readFields(in);
+ //Datanodes are not persistent any more.
+ break;
+ }
+ case OP_SET_PERMISSIONS: {
+ numOpSetPerm++;
+ if (logVersion > -11)
+ throw new IOException("Unexpected opcode " + opcode
+ + " for version " + logVersion);
+ fsDir.unprotectedSetPermission(
+ FSImage.readString(in), FsPermission.read(in));
+ break;
+ }
+ case OP_SET_OWNER: {
+ numOpSetOwner++;
+ if (logVersion > -11)
+ throw new IOException("Unexpected opcode " + opcode
+ + " for version " + logVersion);
+ fsDir.unprotectedSetOwner(FSImage.readString(in),
+ FSImage.readString(in), FSImage.readString(in));
+ break;
+ }
+ case OP_SET_QUOTA: {
+ if (logVersion > -16) {
+ throw new IOException("Unexpected opcode " + opcode
+ + " for version " + logVersion);
}
+ fsDir.unprotectedSetQuota(FSImage.readString(in),
+ readLongWritable(in) );
+ break;
+ }
+ case OP_CLEAR_QUOTA: {
+ if (logVersion > -16) {
+ throw new IOException("Unexpected opcode " + opcode
+ + " for version " + logVersion);
}
+ fsDir.unprotectedClearQuota(FSImage.readString(in));
+ break;
+ }
+ default: {
+ throw new IOException("Never seen opcode " + opcode);
+ }
}
- } finally {
- in.close();
}
- FSImage.LOG.info("Edits file " + edits.getName()
- + " of size " + edits.length() + " edits # " + numEdits
- + " loaded in " + (FSNamesystem.now()-startTime)/1000 + " seconds.");
+ } finally {
+ in.close();
}
+ FSImage.LOG.info("Edits file " + edits.getName()
+ + " of size " + edits.length() + " edits # " + numEdits
+ + " loaded in " + (FSNamesystem.now()-startTime)/1000 + " seconds.");
if (FSImage.LOG.isDebugEnabled()) {
FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose
@@ -718,11 +739,7 @@
for (int idx = 0; idx < editStreams.size(); idx++) {
EditLogOutputStream eStream = editStreams.get(idx);
try {
- DataOutputStream od = eStream.getOutputStream();
- od.write(op);
- for(Writable w : writables) {
- w.write(od);
- }
+ eStream.write(op, writables);
} catch (IOException ie) {
processIOError(idx);
}
@@ -747,7 +764,7 @@
//
// Sync all modifications done by this thread.
//
- public void logSync() {
+ public void logSync() throws IOException {
ArrayList<EditLogOutputStream> errorStreams = null;
long syncStart = 0;
@@ -781,7 +798,7 @@
// swap buffers
for (int idx = 0; idx < editStreams.size(); idx++) {
EditLogOutputStream eStream = editStreams.get(idx);
- eStream.swap();
+ eStream.setReadyToFlush();
}
}
@@ -790,7 +807,7 @@
for (int idx = 0; idx < editStreams.size(); idx++) {
EditLogOutputStream eStream = editStreams.get(idx);
try {
- eStream.flushAndSyncOld();
+ eStream.flush();
} catch (IOException ie) {
//
// remember the streams that encountered an error.
@@ -972,14 +989,13 @@
assert(getNumStorageDirs() == editStreams.size());
long size = 0;
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
- EditLogOutputStream eStream = editStreams.get(idx);
- assert(size == 0 ||
- size == getEditFile(idx).length() + eStream.getBufSize());
- size = getEditFile(idx).length() + eStream.getBufSize();
+ long curSize = editStreams.get(idx).length();
+ assert (size == 0 || size == curSize) : "All streams must be the same";
+ size = curSize;
}
return size;
}
-
+
/**
* Closes the current edit log and opens edits.new.
* Returns the lastModified time of the edits log.
@@ -1006,7 +1022,7 @@
//
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
try {
- EditLogOutputStream eStream = new EditLogOutputStream(getEditNewFile(idx));
+ EditLogFileOutputStream eStream = new EditLogFileOutputStream(getEditNewFile(idx));
eStream.create();
editStreams.add(eStream);
} catch (IOException e) {
@@ -1063,7 +1079,7 @@
* Returns the timestamp of the edit log
*/
synchronized long getFsEditTime() {
- return getEditFile(0).lastModified();
+ return editStreams.get(0).lastModified();
}
// sets the initial capacity of the flush buffer.
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=686420&r1=686419&r2=686420&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Aug 15 17:06:05 2008
@@ -53,6 +53,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog.EditLogFileInputStream;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
@@ -857,10 +858,16 @@
*/
int loadFSEdits(StorageDirectory sd) throws IOException {
int numEdits = 0;
- numEdits = editLog.loadFSEdits(getImageFile(sd, NameNodeFile.EDITS));
+ EditLogFileInputStream edits =
+ new EditLogFileInputStream(getImageFile(sd, NameNodeFile.EDITS));
+ numEdits = FSEditLog.loadFSEdits(edits);
+ edits.close();
File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
- if (editsNew.exists())
- numEdits += editLog.loadFSEdits(editsNew);
+ if (editsNew.exists() && editsNew.length() > 0) {
+ edits = new EditLogFileInputStream(editsNew);
+ numEdits += FSEditLog.loadFSEdits(edits);
+ edits.close();
+ }
return numEdits;
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=686420&r1=686419&r2=686420&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Fri Aug 15 17:06:05 2008
@@ -30,7 +30,7 @@
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
-
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog.EditLogFileInputStream;
/**
* This class tests the creation and validation of a checkpoint.
@@ -139,7 +139,7 @@
for (int i = 0; i < numdirs; i++) {
File editFile = fsimage.getEditFile(i);
System.out.println("Verifying file: " + editFile);
- int numEdits = editLog.loadFSEdits(editFile);
+ int numEdits = FSEditLog.loadFSEdits(new EditLogFileInputStream(editFile));
int numLeases = FSNamesystem.getFSNamesystem().leaseManager.countLease();
System.out.println("Number of outstanding leases " + numLeases);
assertEquals(0, numLeases);