You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2007/10/10 01:48:46 UTC

svn commit: r583323 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/FSEditLog.java src/java/org/apache/hadoop/dfs/NameNode.java src/java/org/apache/hadoop/dfs/NameNodeMetrics.java src/test/org/apache/hadoop/dfs/TestEditLog.java

Author: dhruba
Date: Tue Oct  9 16:48:46 2007
New Revision: 583323

URL: http://svn.apache.org/viewvc?rev=583323&view=rev
Log:
HADOOP-1942. Increase the concurrency of transaction logging to
edits log. Reduce the number of syncs by double-buffering the changes
to the transaction log. (Dhruba Borthakur)


Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java   (with props)
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java   (with props)
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=583323&r1=583322&r2=583323&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Oct  9 16:48:46 2007
@@ -362,6 +362,10 @@
 
     HADOOP-1971.  Warn when job does not specify a jar. (enis via cutting)
 
+    HADOOP-1942. Increase the concurrency of transaction logging to 
+    edits log. Reduce the number of syncs by double-buffering the changes
+    to the transaction log. (Dhruba Borthakur)
+
 
 Release 0.14.2 - 2007-10-09
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?rev=583323&r1=583322&r2=583323&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Tue Oct  9 16:48:46 2007
@@ -20,6 +20,8 @@
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
@@ -27,6 +29,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.lang.Math;
+import java.nio.channels.FileChannel;
 
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.UTF8;
@@ -45,33 +48,167 @@
   //the following two are used only for backword compatibility :
   @Deprecated private static final byte OP_DATANODE_ADD = 5;
   @Deprecated private static final byte OP_DATANODE_REMOVE = 6;
+  private static int sizeFlushBuffer = 512*1024;
 
   private ArrayList<EditLogOutputStream> editStreams = null;
   private FSImage fsimage = null;
 
-  private long lastModificationTime;
-  private long lastSyncTime;
-  
-  static class EditLogOutputStream extends DataOutputStream {
+  // a monotonically increasing counter that represents transactionIds.
+  private long txid = 0;
+
+  // stores the last synced transactionId.
+  private long synctxid = 0;
+
+  // the time of printing the statistics to the log file.
+  private long lastPrintTime;
+
+  // is a sync currently running?
+  private boolean isSyncRunning;
+
+  // these are statistics counters.
+  private long numTransactions;        // number of transactions
+  private long totalTimeTransactions;  // total time for all transactions
+  private NameNodeMetrics metrics;
+
+  private static class TransactionId {
+    public long txid;
+
+    TransactionId(long value) {
+      this.txid = value;
+    }
+  }
+
+  // stores the most current transactionId of this thread.
+  private static final ThreadLocal<TransactionId> myTransactionId = new ThreadLocal<TransactionId>() {
+    protected synchronized TransactionId initialValue() {
+      return new TransactionId(Long.MAX_VALUE);
+    }
+  };
+
+  static class EditLogOutputStream {
+    private FileChannel fc;
+    private FileOutputStream fp;
+    private DataOutputStream od;
+    private DataOutputStream od1;
+    private DataOutputStream od2;
+    private ByteArrayOutputStream buf1;
+    private ByteArrayOutputStream buf2;
+    private int bufSize;
+
+    // these are statistics counters
+    private long numSync;        // number of syncs to disk
+    private long totalTimeSync;  // total time to sync
+
     EditLogOutputStream(File name) throws IOException {
-      super(new FileOutputStream(name, true)); // open for append
+      bufSize = sizeFlushBuffer;
+      buf1 = new ByteArrayOutputStream(bufSize);
+      buf2 = new ByteArrayOutputStream(bufSize);
+      od1 = new DataOutputStream(buf1);
+      od2 = new DataOutputStream(buf2);
+      od = od1;                              // start with first buffer
+      fp = new FileOutputStream(name, true); // open for append
+      fc = fp.getChannel();
+      numSync = totalTimeSync = 0;
+    }
+
+    // returns the current output stream
+    DataOutputStream getOutputStream() {
+      return od;
     }
 
     void flushAndSync() throws IOException {
-      ((FileOutputStream)out).getChannel().force(true);
+      this.flush();
+      fc.force(true);
     }
 
     void create() throws IOException {
-      ((FileOutputStream)out).getChannel().truncate(0);
-      writeInt(FSConstants.LAYOUT_VERSION);
+      fc.truncate(0);
+      od.writeInt(FSConstants.LAYOUT_VERSION);
       flushAndSync();
     }
+
+    // flush current buffer
+    private void flush() throws IOException {
+      ByteArrayOutputStream buf = getBuffer();
+      if (buf.size() == 0) {
+        return;                // no data to flush
+      }
+      buf.writeTo(fp);         // write data to file
+      buf.reset();             // erase all data in buf
+    }
+
+    void close() throws IOException {
+      // close should have been called after all pending transactions 
+      // have been flushed & synced.
+      if (getBufSize() != 0) {
+        throw new IOException("FSEditStream has " + getBufSize() +
+                              " bytes still to be flushed and cannot " +
+                              "closed.");
+      } 
+      od.close();
+      fp.close();
+      buf1 = buf2 = null;
+      od = od1 = od2 = null;
+    }
+
+    // returns the amount of data in the buffer
+    int getBufSize() {
+      return getBuffer().size();
+    }
+
+    // get the current buffer
+    private ByteArrayOutputStream getBuffer() {
+      if (od == od1) {
+        return buf1;
+      } else {
+        return buf2;
+      }
+    }
+
+    //
+    // Flush current buffer to output stream, swap buffers
+    // This is protected by the flushLock.
+    //
+    void swap() {
+      if (od == od1) {
+        od = od2;
+      } else {
+        od = od1;
+      }
+    }
+
+    //
+    // Flush old buffer to persistent store
+    //
+    void flushAndSyncOld() throws IOException {
+      numSync++;
+      ByteArrayOutputStream oldbuf;
+      if (od == od1) {
+        oldbuf = buf2;
+      } else {
+        oldbuf = buf1;
+      }
+      long start = FSNamesystem.now();
+      oldbuf.writeTo(fp);         // write data to file
+      oldbuf.reset();             // erase all data in buf
+      fc.force(true);             // sync to persistent store
+      long end = FSNamesystem.now();
+      totalTimeSync += (end - start);
+    }
+
+    long getTotalSyncTime() {
+      return totalTimeSync;
+    }
+
+    long getNumSync() {
+      return numSync;
+    }
   }
 
   FSEditLog(FSImage image) {
     fsimage = image;
-    lastModificationTime = 0;
-    lastSyncTime = 0;
+    isSyncRunning = false;
+    metrics = NameNode.getNameNodeMetrics();
   }
 
   private File getEditFile(int idx) {
@@ -101,6 +238,7 @@
    * @throws IOException
    */
   synchronized void open() throws IOException {
+    numTransactions = totalTimeTransactions = 0;
     int size = getNumStorageDirs();
     if (editStreams == null)
       editStreams = new ArrayList<EditLogOutputStream>(size);
@@ -138,9 +276,18 @@
    * Shutdown the filestore
    */
   synchronized void close() throws IOException {
+    while (isSyncRunning) {
+      try {
+        wait(1000);
+      } catch (InterruptedException ie) { 
+      }
+    }
     if (editStreams == null) {
       return;
     }
+    printStatistics(true);
+    numTransactions = totalTimeTransactions = 0;
+
     for (int idx = 0; idx < editStreams.size(); idx++) {
       EditLogOutputStream eStream = editStreams.get(idx);
       try {
@@ -175,6 +322,38 @@
   }
 
   /**
+   * The specified streams have IO errors. Remove them from logging
+   * new transactions.
+   */
+  private void processIOError(ArrayList<EditLogOutputStream> errorStreams) {
+    if (errorStreams == null) {
+      return;                       // nothing to do
+    }
+    for (int idx = 0; idx < errorStreams.size(); idx++) {
+      EditLogOutputStream eStream = errorStreams.get(idx);
+      int j = 0;
+      for (j = 0; j < editStreams.size(); j++) {
+        if (editStreams.get(j) == eStream) {
+          break;
+        }
+      }
+      if (j == editStreams.size()) {
+          FSNamesystem.LOG.error("Unable to find sync log on which " +
+                                 " IO error occured. " +
+                                 "Fatal Error.");
+          Runtime.getRuntime().exit(-1);
+      }
+      try {
+        processIOError(idx);         
+      } catch (IOException e) {
+        FSNamesystem.LOG.error("Unable to sync edit log. " +
+                               "Fatal Error.");
+        Runtime.getRuntime().exit(-1);
+      }
+    }
+  }
+
+  /**
    * check if ANY edits.new log exists
    */
   boolean existsNew() throws IOException {
@@ -425,65 +604,140 @@
    */
   synchronized void logEdit(byte op, Writable w1, Writable w2) {
     assert this.getNumEditStreams() > 0 : "no editlog streams";
+    long start = FSNamesystem.now();
     for (int idx = 0; idx < editStreams.size(); idx++) {
-      EditLogOutputStream eStream;
-      synchronized (eStream = editStreams.get(idx)) {
+      EditLogOutputStream eStream = editStreams.get(idx);
+      try {
+        DataOutputStream od = eStream.getOutputStream();
+        od.write(op);
+        if (w1 != null) {
+          w1.write(od);
+        }
+        if (w2 != null) {
+          w2.write(od);
+        }
+      } catch (IOException ie) {
         try {
-          eStream.write(op);
-          if (w1 != null) {
-            w1.write(eStream);
-          }
-          if (w2 != null) {
-            w2.write(eStream);
-          }
-        } catch (IOException ie) {
-          try {
-            processIOError(idx);         
-          } catch (IOException e) {
-            FSNamesystem.LOG.error("Unable to append to edit log. " +
-                                   "Fatal Error.");
-            Runtime.getRuntime().exit(-1);
-          }
+          processIOError(idx);         
+        } catch (IOException e) {
+          FSNamesystem.LOG.error("Unable to append to edit log. " +
+                                 "Fatal Error.");
+          Runtime.getRuntime().exit(-1);
         }
       }
     }
+    // get a new transactionId
+    txid++;
+
     //
-    // record the time when new data was written to the edits log
+    // record the transactionId when new data was written to the edits log
     //
-    lastModificationTime = System.currentTimeMillis();
+    TransactionId id = (TransactionId)myTransactionId.get();
+    id.txid = txid;
+
+    // update statistics
+    long end = FSNamesystem.now();
+    numTransactions++;
+    totalTimeTransactions += (end-start);
+    metrics.incrNumTransactions(1, (int)(end-start));
   }
 
   //
-  // flush all data of the Edits log into persistent store
+  // Sync all modifications done by this thread.
   //
-  synchronized void logSync() {
-    assert this.getNumEditStreams() > 0 : "no editlog streams";
+  void logSync() {
+    ArrayList<EditLogOutputStream> errorStreams = null;
+    long syncStart = 0;
+
+    // Fetch the transactionId of this thread. 
+    TransactionId id = (TransactionId)myTransactionId.get();
+    long mytxid = id.txid;
+
+    synchronized (this) {
+      assert this.getNumEditStreams() > 0 : "no editlog streams";
+      printStatistics(false);
 
-    //
-    // If data was generated before the beginning of the last sync time
-    // then there is nothing to flush
-    //
-    if (lastModificationTime < lastSyncTime) {
-      return;
+      // if somebody is already syncing, then wait
+      while (mytxid > synctxid && isSyncRunning) {
+        try {
+          wait(1000);
+        } catch (InterruptedException ie) { 
+        }
+      }
+
+      //
+      // If this transaction was already flushed, then nothing to do
+      //
+      if (mytxid <= synctxid) {
+        return;
+      }
+   
+      // now, this thread will do the sync
+      syncStart = txid;
+      isSyncRunning = true;   
+
+      // swap buffers
+      for (int idx = 0; idx < editStreams.size(); idx++) {
+        EditLogOutputStream eStream = editStreams.get(idx);
+        eStream.swap();
+      }
     }
-    lastSyncTime = System.currentTimeMillis();
 
+    // do the sync
+    long start = FSNamesystem.now();
     for (int idx = 0; idx < editStreams.size(); idx++) {
-      EditLogOutputStream eStream;
-      synchronized (eStream = editStreams.get(idx)) {
-        try {
-          eStream.flushAndSync();
-        } catch (IOException ie) {
-          try {
-            processIOError(idx);         
-          } catch (IOException e) {
-            FSNamesystem.LOG.error("Unable to sync edit log. " +
-                                   "Fatal Error.");
-            Runtime.getRuntime().exit(-1);
-          }
+      EditLogOutputStream eStream = editStreams.get(idx);
+      try {
+        eStream.flushAndSyncOld();
+      } catch (IOException ie) {
+        //
+        // remember the streams that encountered an error.
+        //
+        if (errorStreams == null) {
+          errorStreams = new ArrayList<EditLogOutputStream>(1);
         }
+        errorStreams.add(eStream);
+        FSNamesystem.LOG.error("Unable to sync edit log. " +
+                               "Fatal Error.");
       }
     }
+    long elapsed = FSNamesystem.now() - start;
+
+    synchronized (this) {
+       processIOError(errorStreams);
+       synctxid = syncStart;
+       isSyncRunning = false;
+       this.notifyAll();
+    }
+
+    metrics.incrSyncs(1, (int)elapsed);
+  }
+
+  //
+  // print statistics every 1 minute.
+  //
+  private void printStatistics(boolean force) {
+    long now = FSNamesystem.now();
+    if (lastPrintTime + 60000 < now && !force) {
+      return;
+    }
+    if (editStreams == null) {
+      return;
+    }
+    lastPrintTime = now;
+    StringBuffer buf = new StringBuffer();
+
+    buf.append("Number of transactions: " + numTransactions +
+               " Total time for transactions(ms): " + 
+               totalTimeTransactions);
+    buf.append(" Number of syncs: " + editStreams.get(0).getNumSync());
+    buf.append(" SyncTimes(ms): ");
+    for (int idx = 0; idx < editStreams.size(); idx++) {
+      EditLogOutputStream eStream = editStreams.get(idx);
+      buf.append(eStream.getTotalSyncTime());
+      buf.append(" ");
+    }
+    FSNamesystem.LOG.info(buf);
   }
 
   /** 
@@ -561,10 +815,10 @@
     assert(getNumStorageDirs() == editStreams.size());
     long size = 0;
     for (int idx = 0; idx < getNumStorageDirs(); idx++) {
-      synchronized (editStreams.get(idx)) {
-        assert(size == 0 || size == getEditFile(idx).length());
-        size = getEditFile(idx).length();
-      }
+      EditLogOutputStream eStream = editStreams.get(idx);
+      assert(size == 0 || 
+             size == getEditFile(idx).length() + eStream.getBufSize());
+      size = getEditFile(idx).length() + eStream.getBufSize();
     }
     return size;
   }
@@ -653,5 +907,10 @@
    */
   synchronized long getFsEditTime() throws IOException {
     return getEditFile(0).lastModified();
+  }
+
+  // sets the initial capacity of the flush buffer.
+  static void setBufferCapacity(int size) {
+    sizeFlushBuffer = size;
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=583323&r1=583322&r2=583323&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Tue Oct  9 16:48:46 2007
@@ -31,12 +31,6 @@
 import java.util.Collection;
 import java.util.Iterator;
 
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.Updater;
-import org.apache.hadoop.metrics.jvm.JvmMetrics;
-
 /**********************************************************
  * NameNode serves as both directory namespace manager and
  * "inode table" for the Hadoop DFS.  There is a single NameNode
@@ -105,62 +99,11 @@
     format(conf, false);
   }
 
-  private static class NameNodeMetrics implements Updater {
-    private final MetricsRecord metricsRecord;
-    
-    private int numFilesCreated = 0;
-    private int numFilesOpened = 0;
-    private int numFilesRenamed = 0;
-    private int numFilesListed = 0;
-      
-    NameNodeMetrics(Configuration conf) {
-      String sessionId = conf.get("session.id");
-      // Initiate Java VM metrics
-      JvmMetrics.init("NameNode", sessionId);
-      // Create a record for NameNode metrics
-      MetricsContext metricsContext = MetricsUtil.getContext("dfs");
-      metricsRecord = MetricsUtil.createRecord(metricsContext, "namenode");
-      metricsRecord.setTag("sessionId", sessionId);
-      metricsContext.registerUpdater(this);
-    }
-      
-    /**
-     * Since this object is a registered updater, this method will be called
-     * periodically, e.g. every 5 seconds.
-     */
-    public void doUpdates(MetricsContext unused) {
-      synchronized (this) {
-        metricsRecord.incrMetric("files_created", numFilesCreated);
-        metricsRecord.incrMetric("files_opened", numFilesOpened);
-        metricsRecord.incrMetric("files_renamed", numFilesRenamed);
-        metricsRecord.incrMetric("files_listed", numFilesListed);
-              
-        numFilesCreated = 0;
-        numFilesOpened = 0;
-        numFilesRenamed = 0;
-        numFilesListed = 0;
-      }
-      metricsRecord.update();
-    }
-      
-    synchronized void createFile() {
-      ++numFilesCreated;
-    }
-      
-    synchronized void openFile() {
-      ++numFilesOpened;
-    }
-      
-    synchronized void renameFile() {
-      ++numFilesRenamed;
-    }
-      
-    synchronized void listFile(int nfiles) {
-      numFilesListed += nfiles;
-    }
+  static NameNodeMetrics myMetrics;
+
+  public static NameNodeMetrics getNameNodeMetrics() {
+    return myMetrics;
   }
-    
-  private NameNodeMetrics myMetrics;
     
   /**
    * Initialize the server

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java?rev=583323&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java Tue Oct  9 16:48:46 2007
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+
+class NameNodeMetrics implements Updater {
+    private final MetricsRecord metricsRecord;
+    
+    private int numFilesCreated = 0;
+    private int numFilesOpened = 0;
+    private int numFilesRenamed = 0;
+    private int numFilesListed = 0;
+
+    private int numTransactions = 0;
+    private int totalTimeTransactionsLogMemory = 0;
+    private int numSyncs = 0;
+    private int totalTimeSyncs = 0;
+      
+    NameNodeMetrics(Configuration conf) {
+      String sessionId = conf.get("session.id");
+      // Initiate Java VM metrics
+      JvmMetrics.init("NameNode", sessionId);
+      // Create a record for NameNode metrics
+      MetricsContext metricsContext = MetricsUtil.getContext("dfs");
+      metricsRecord = MetricsUtil.createRecord(metricsContext, "namenode");
+      metricsRecord.setTag("sessionId", sessionId);
+      metricsContext.registerUpdater(this);
+    }
+      
+    /**
+     * Since this object is a registered updater, this method will be called
+     * periodically, e.g. every 5 seconds.
+     */
+    public void doUpdates(MetricsContext unused) {
+      synchronized (this) {
+        metricsRecord.incrMetric("files_created", numFilesCreated);
+        metricsRecord.incrMetric("files_opened", numFilesOpened);
+        metricsRecord.incrMetric("files_renamed", numFilesRenamed);
+        metricsRecord.incrMetric("files_listed", numFilesListed);
+        metricsRecord.incrMetric("num_transactions", numTransactions);
+        metricsRecord.incrMetric("avg_time_transactions_memory", 
+                                 getAverageTimeTransaction());
+        metricsRecord.incrMetric("num_syncs", numSyncs);
+        metricsRecord.incrMetric("avg_time_transactions_sync", 
+                                 getAverageTimeSync());
+              
+        numFilesCreated = 0;
+        numFilesOpened = 0;
+        numFilesRenamed = 0;
+        numFilesListed = 0;
+        numTransactions = 0;
+        totalTimeTransactionsLogMemory = 0;
+        numSyncs = 0;
+        totalTimeSyncs = 0;
+      }
+      metricsRecord.update();
+    }
+      
+    synchronized void createFile() {
+      ++numFilesCreated;
+    }
+      
+    synchronized void openFile() {
+      ++numFilesOpened;
+    }
+      
+    synchronized void renameFile() {
+      ++numFilesRenamed;
+    }
+      
+    synchronized void listFile(int nfiles) {
+      numFilesListed += nfiles;
+    }
+
+    synchronized void incrNumTransactions(int count, int time) {
+      numTransactions += count;
+      totalTimeTransactionsLogMemory += time;
+    }
+
+    synchronized void incrSyncs(int count, int time) {
+      numSyncs += count;
+      totalTimeSyncs += time;
+    }
+
+    synchronized private int getAverageTimeTransaction() {
+      if (numTransactions == 0) {
+        return 0;
+      }
+      return totalTimeTransactionsLogMemory/numTransactions;
+    }
+
+    synchronized private int getAverageTimeSync() {
+      if (numSyncs == 0) {
+        return 0;
+      }
+      return totalTimeSyncs/numSyncs;
+    }
+}

Propchange: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java?rev=583323&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java Tue Oct  9 16:48:46 2007
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+
+
+/**
+ * This class tests the creation and validation of a checkpoint.
+ */
+public class TestEditLog extends TestCase {
+  static final int numDatanodes = 1;
+
+  // This test creates numThreads threads and each thread does
+  // numberTransactions Transactions concurrently.
+  int numberTransactions = 1000;
+  int numThreads = 100;
+
+  //
+  // an object that does a bunch of transactions
+  //
+  static class Transactions implements Runnable {
+    FSEditLog editLog;
+    int numTransactions;
+    short replication = 3;
+    long blockSize = 64;
+
+    Transactions(FSEditLog editlog, int num) {
+      editLog = editlog;
+      numTransactions = num;
+    }
+
+    // add a bunch of transactions.
+    public void run() {
+      for (int i = 0; i < numTransactions; i++) {
+        INodeFile inode = new INodeFile(0, replication, 0, blockSize);
+        editLog.logCreateFile("/filename" + i, inode);
+        editLog.logSync();
+      }
+    }
+  }
+
+  /**
+   * Tests transaction logging in dfs.
+   */
+  public void testEditLog() throws IOException {
+
+    // start a cluster 
+
+    Collection<File> namedirs = null;
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(0, conf, numDatanodes, 
+                                                true, true, null, null);
+    cluster.waitActive();
+    FileSystem fileSys = cluster.getFileSystem();
+    int numdirs = 0;
+
+    try {
+      namedirs = cluster.getNameDirs();
+    } finally {
+      fileSys.close();
+      cluster.shutdown();
+    }
+
+    for (Iterator it = namedirs.iterator(); it.hasNext(); ) {
+      File dir = (File)it.next();
+      System.out.println(dir);
+      numdirs++;
+    }
+
+    FSImage fsimage = new FSImage(namedirs);
+    FSEditLog editLog = fsimage.getEditLog();
+
+    // set small size of flush buffer
+    editLog.setBufferCapacity(2048);
+    editLog.close();
+    editLog.open();
+  
+    // Create threads and make them run transactions concurrently.
+    Thread threadId[] = new Thread[numThreads];
+    for (int i = 0; i < numThreads; i++) {
+      Transactions trans = new Transactions(editLog, numberTransactions);
+      threadId[i] = new Thread(trans, "TransactionThread-" + i);
+      threadId[i].start();
+    }
+
+    // wait for all transactions to get over
+    for (int i = 0; i < numThreads; i++) {
+      try {
+        threadId[i].join();
+      } catch (InterruptedException e) {
+        i--;      // retry 
+      }
+    } 
+    
+    editLog.close();
+
+    // Verify that we can read in all the transactions that we have written.
+    // If there were any corruptions, it is likely that the reading in
+    // of these transactions will throw an exception.
+    //
+    for (int i = 0; i < numdirs; i++) {
+      File editFile = fsimage.getEditFile(i);
+      System.out.println("Verifying file: " + editFile);
+      int numEdits = editLog.loadFSEdits(editFile);
+      assertTrue("Verification for " + editFile + " failed. " +
+                 "Expected " + (numThreads * numberTransactions) + " transactions. "+
+                 "Found " + numEdits + " transactions.",
+                 numEdits == numThreads * numberTransactions);
+
+    }
+  }
+}

Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL