You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2010/05/21 20:11:36 UTC
svn commit: r947106 - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/server/namenode/
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Author: hairong
Date: Fri May 21 18:11:35 2010
New Revision: 947106
URL: http://svn.apache.org/viewvc?rev=947106&view=rev
Log:
HDFS-1112. Edit log buffer should not grow unboundedly. Contributed by Hairong Kuang.
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=947106&r1=947105&r2=947106&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Fri May 21 18:11:35 2010
@@ -13,9 +13,11 @@ Trunk (unreleased changes)
HDFS-1061. Memory footprint optimization for INodeFile object.
(Bharath Mundlapudi via jghoman)
- HDFS=1079. Throw exceptions as specified by the AbstractFileSystem
+ HDFS-1079. Throw exceptions as specified by the AbstractFileSystem
in HDFS implemenation and protocols. (suresh)
+ HDFS-1112. Edit log buffer should not grow unfoundedly. (hairong)
+
BUG FIXES
HDFS 1021. specify correct server principal for RefreshAuthorizationPolicyProtocol
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=947106&r1=947105&r2=947106&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Fri May 21 18:11:35 2010
@@ -41,6 +41,7 @@ class EditLogFileOutputStream extends Ed
private FileChannel fc; // channel of the file stream for sync
private DataOutputBuffer bufCurrent; // current buffer for writing
private DataOutputBuffer bufReady; // buffer ready for flushing
+ final private int initBufferSize; // inital buffer size
static ByteBuffer fill = ByteBuffer.allocateDirect(512); // preallocation
/**
@@ -55,6 +56,7 @@ class EditLogFileOutputStream extends Ed
EditLogFileOutputStream(File name, int size) throws IOException {
super();
file = name;
+ initBufferSize = size;
bufCurrent = new DataOutputBuffer(size);
bufReady = new DataOutputBuffer(size);
RandomAccessFile rp = new RandomAccessFile(name, "rw");
@@ -146,6 +148,14 @@ class EditLogFileOutputStream extends Ed
}
/**
+ * @return true if the number of buffered data exceeds the intial buffer size
+ */
+ @Override
+ public boolean shouldForceSync() {
+ return bufReady.size() >= initBufferSize;
+ }
+
+ /**
* Return the size of the current edit log including buffered data.
*/
@Override
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=947106&r1=947105&r2=947106&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Fri May 21 18:11:35 2010
@@ -91,6 +91,17 @@ implements JournalStream {
*/
abstract long length() throws IOException;
+ /**
+ * Implement the policy when to automatically sync the buffered edits log
+ * The buffered edits can be flushed when the buffer becomes full or
+ * a certain period of time is elapsed.
+ *
+ * @return true if the buffered data should be automatically synced to disk
+ */
+ public boolean shouldForceSync() {
+ return false;
+ }
+
boolean isOperationSupported(byte op) {
return true;
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=947106&r1=947105&r2=947106&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri May 21 18:11:35 2010
@@ -115,6 +115,8 @@ public class FSEditLog {
// is a sync currently running?
private volatile boolean isSyncRunning;
+ // is an automatic sync scheduled?
+ private volatile boolean isAutoSyncScheduled = false;
// these are statistics counters.
private long numTransactions; // number of transactions
@@ -846,31 +848,84 @@ public class FSEditLog {
}
/**
- * Write an operation to the edit log. Do not sync to persistent
- * store yet.
+ * Write an operation to the edit log.
+ * Automatically sync buffered edits to persistent store if it is time
+ * to sync.
*/
- synchronized void logEdit(byte op, Writable ... writables) {
- if(getNumEditStreams() == 0)
- throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
- ArrayList<EditLogOutputStream> errorStreams = null;
- long start = FSNamesystem.now();
- for(EditLogOutputStream eStream : editStreams) {
- FSImage.LOG.debug("loggin edits into " + eStream.getName() + " stream");
- if(!eStream.isOperationSupported(op))
- continue;
- try {
- eStream.write(op, writables);
- } catch (IOException ie) {
- FSImage.LOG.warn("logEdit: removing "+ eStream.getName(), ie);
- if(errorStreams == null)
- errorStreams = new ArrayList<EditLogOutputStream>(1);
- errorStreams.add(eStream);
+ void logEdit(byte op, Writable ... writables) {
+ synchronized (this) {
+ // wait if an automatic sync is scheduled
+ waitIfAutoSyncScheduled();
+
+ if(getNumEditStreams() == 0)
+ throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
+ ArrayList<EditLogOutputStream> errorStreams = null;
+ long start = FSNamesystem.now();
+ for(EditLogOutputStream eStream : editStreams) {
+ FSImage.LOG.debug("loggin edits into " + eStream.getName() + " stream");
+ if(!eStream.isOperationSupported(op))
+ continue;
+ try {
+ eStream.write(op, writables);
+ } catch (IOException ie) {
+ FSImage.LOG.warn("logEdit: removing "+ eStream.getName(), ie);
+ if(errorStreams == null)
+ errorStreams = new ArrayList<EditLogOutputStream>(1);
+ errorStreams.add(eStream);
+ }
}
+ processIOError(errorStreams, true);
+ recordTransaction(start);
+
+ // check if it is time to schedule an automatic sync
+ if (!shouldForceSync()) {
+ return;
+ }
+ isAutoSyncScheduled = true;
}
- processIOError(errorStreams, true);
- recordTransaction(start);
+
+ // sync buffered edit log entries to persistent store
+ logSync();
}
+ /**
+ * Wait if an automatic sync is scheduled
+ * @throws InterruptedException
+ */
+ synchronized void waitIfAutoSyncScheduled() {
+ try {
+ while (isAutoSyncScheduled) {
+ this.wait(1000);
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+
+ /**
+ * Signal that an automatic sync scheduling is done if it is scheduled
+ */
+ synchronized void doneWithAutoSyncScheduling() {
+ if (isAutoSyncScheduled) {
+ isAutoSyncScheduled = false;
+ notifyAll();
+ }
+ }
+
+ /**
+ * Check if should automatically sync buffered edits to
+ * persistent store
+ *
+ * @return true if any of the edit stream says that it should sync
+ */
+ private boolean shouldForceSync() {
+ for (EditLogOutputStream eStream : editStreams) {
+ if (eStream.shouldForceSync()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private void recordTransaction(long start) {
// get a new transactionId
txid++;
@@ -935,16 +990,17 @@ public class FSEditLog {
* concurrency with sync() should be synchronized and also call
* waitForSyncToFinish() before assuming they are running alone.
*/
- public void logSync() throws IOException {
+ public void logSync() {
ArrayList<EditLogOutputStream> errorStreams = null;
long syncStart = 0;
// Fetch the transactionId of this thread.
long mytxid = myTransactionId.get().txid;
- EditLogOutputStream streams[] = null;
+ ArrayList<EditLogOutputStream> streams = new ArrayList<EditLogOutputStream>();
boolean sync = false;
try {
synchronized (this) {
+ try {
assert editStreams.size() > 0 : "no editlog streams";
printStatistics(false);
@@ -973,16 +1029,29 @@ public class FSEditLog {
// swap buffers
for(EditLogOutputStream eStream : editStreams) {
- eStream.setReadyToFlush();
+ try {
+ eStream.setReadyToFlush();
+ streams.add(eStream);
+ } catch (IOException ie) {
+ FSNamesystem.LOG.error("Unable to get ready to flush.", ie);
+ //
+ // remember the streams that encountered an error.
+ //
+ if (errorStreams == null) {
+ errorStreams = new ArrayList<EditLogOutputStream>(1);
+ }
+ errorStreams.add(eStream);
+ }
+ }
+ } finally {
+ // Prevent RuntimeException from blocking other log edit write
+ doneWithAutoSyncScheduling();
}
- streams =
- editStreams.toArray(new EditLogOutputStream[editStreams.size()]);
}
// do the sync
long start = FSNamesystem.now();
- for (int idx = 0; idx < streams.length; idx++) {
- EditLogOutputStream eStream = streams[idx];
+ for (EditLogOutputStream eStream : streams) {
try {
eStream.flush();
} catch (IOException ie) {
@@ -1002,6 +1071,7 @@ public class FSEditLog {
if (metrics != null) // Metrics non-null only when used inside name node
metrics.syncs.inc(elapsed);
} finally {
+ // Prevent RuntimeException from blocking other log edit sync
synchronized (this) {
synctxid = syncStart;
if (sync) {
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java?rev=947106&r1=947105&r2=947106&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java Fri May 21 18:11:35 2010
@@ -79,31 +79,25 @@ public class CreateEditsLog {
blocks[iB].setBlockId(currentBlockId++);
}
- try {
-
- INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
- null, replication, 0, blockSize, blocks, p, "", "", null);
- // Append path to filename with information about blockIDs
- String path = "_" + iF + "_B" + blocks[0].getBlockId() +
- "_to_B" + blocks[blocksPerFile-1].getBlockId() + "_";
- String filePath = nameGenerator.getNextFileName("");
- filePath = filePath + path;
- // Log the new sub directory in edits
- if ((iF % nameGenerator.getFilesPerDirectory()) == 0) {
- String currentDir = nameGenerator.getCurrentDir();
- dirInode = new INodeDirectory(p, 0L);
- editLog.logMkDir(currentDir, dirInode);
- }
- editLog.logOpenFile(filePath, inode);
- editLog.logCloseFile(filePath, inode);
+ INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
+ null, replication, 0, blockSize, blocks, p, "", "", null);
+ // Append path to filename with information about blockIDs
+ String path = "_" + iF + "_B" + blocks[0].getBlockId() +
+ "_to_B" + blocks[blocksPerFile-1].getBlockId() + "_";
+ String filePath = nameGenerator.getNextFileName("");
+ filePath = filePath + path;
+ // Log the new sub directory in edits
+ if ((iF % nameGenerator.getFilesPerDirectory()) == 0) {
+ String currentDir = nameGenerator.getCurrentDir();
+ dirInode = new INodeDirectory(p, 0L);
+ editLog.logMkDir(currentDir, dirInode);
+ }
+ editLog.logOpenFile(filePath, inode);
+ editLog.logCloseFile(filePath, inode);
- if (currentBlockId - bidAtSync >= 2000) { // sync every 2K blocks
- editLog.logSync();
- bidAtSync = currentBlockId;
- }
- } catch (IOException e) {
- System.out.println("Creating trascation for file " + iF +
- " encountered exception " + e);
+ if (currentBlockId - bidAtSync >= 2000) { // sync every 2K blocks
+ editLog.logSync();
+ bidAtSync = currentBlockId;
}
}
System.out.println("Created edits log in directory " + edits_dir);
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=947106&r1=947105&r2=947106&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Fri May 21 18:11:35 2010
@@ -64,16 +64,11 @@ public class TestEditLog extends TestCas
FSEditLog editLog = namesystem.getEditLog();
for (int i = 0; i < numTransactions; i++) {
- try {
- INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
- p, replication, blockSize, 0, "", "", null);
- editLog.logOpenFile("/filename" + i, inode);
- editLog.logCloseFile("/filename" + i, inode);
- editLog.logSync();
- } catch (IOException e) {
- System.out.println("Transaction " + i + " encountered exception " +
- e);
- }
+ INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
+ p, replication, blockSize, 0, "", "", null);
+ editLog.logOpenFile("/filename" + i, inode);
+ editLog.logCloseFile("/filename" + i, inode);
+ editLog.logSync();
}
}
}
@@ -82,6 +77,18 @@ public class TestEditLog extends TestCas
* Tests transaction logging in dfs.
*/
public void testEditLog() throws IOException {
+ testEditLog(2048);
+ // force edit buffer to automatically sync on each log of edit log entry
+ testEditLog(1);
+ }
+
+ /**
+ * Test edit log with different initial buffer size
+ *
+ * @param initialSize initial edit log buffer size
+ * @throws IOException
+ */
+ private void testEditLog(int initialSize) throws IOException {
// start a cluster
Configuration conf = new HdfsConfiguration();
@@ -103,7 +110,7 @@ public class TestEditLog extends TestCas
FSEditLog editLog = fsimage.getEditLog();
// set small size of flush buffer
- editLog.setBufferCapacity(2048);
+ editLog.setBufferCapacity(initialSize);
editLog.close();
editLog.open();