You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/01/23 06:28:51 UTC

svn commit: r1437274 - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/util/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/

Author: tedyu
Date: Wed Jan 23 05:28:51 2013
New Revision: 1437274

URL: http://svn.apache.org/viewvc?rev=1437274&view=rev
Log:
HBASE-6466 Revert, TestLogRolling failed twice on trunk build


Modified:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1437274&r1=1437273&r2=1437274&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java Wed Jan 23 05:28:51 2013
@@ -208,30 +208,16 @@ public class Threads {
   }
 
   /**
-   * Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)},
-   * without setting the exception handler.
-   */
-  public static ThreadFactory newDaemonThreadFactory(final String prefix) {
-    return newDaemonThreadFactory(prefix, null);
-  }
-
-  /**
-   * Get a named {@link ThreadFactory} that just builds daemon threads.
+   * Get a named {@link ThreadFactory} that just builds daemon threads
    * @param prefix name prefix for all threads created from the factory
-   * @param handler unhandles exception handler to set for all threads
-   * @return a thread factory that creates named, daemon threads with
-   *         the supplied exception handler and normal priority
+   * @return a thread factory that creates named, daemon threads
    */
-  public static ThreadFactory newDaemonThreadFactory(final String prefix,
-      final UncaughtExceptionHandler handler) {
+  public static ThreadFactory newDaemonThreadFactory(final String prefix) {
     final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
     return new ThreadFactory() {
       @Override
       public Thread newThread(Runnable r) {
         Thread t = namedFactory.newThread(r);
-        if (handler != null) {
-          t.setUncaughtExceptionHandler(handler);
-        }
         if (!t.isDaemon()) {
           t.setDaemon(true);
         }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1437274&r1=1437273&r2=1437274&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Jan 23 05:28:51 2013
@@ -1531,7 +1531,8 @@ public class  HRegionServer implements C
 
     Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
         uncaughtExceptionHandler);
-    this.cacheFlusher.start(uncaughtExceptionHandler);
+    Threads.setDaemonThreadRunning(this.cacheFlusher.getThread(), n + ".cacheFlusher",
+      uncaughtExceptionHandler);
     Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
       ".compactionChecker", uncaughtExceptionHandler);
     if (this.healthCheckChore != null) {
@@ -1789,7 +1790,7 @@ public class  HRegionServer implements C
    */
   protected void join() {
     Threads.shutdown(this.compactionChecker.getThread());
-    this.cacheFlusher.join();
+    Threads.shutdown(this.cacheFlusher.getThread());
     if (this.healthCheckChore != null) {
       Threads.shutdown(this.healthCheckChore.getThread());
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1437274&r1=1437273&r2=1437274&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Wed Jan 23 05:28:51 2013
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.management.ManagementFactory;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
@@ -30,10 +29,10 @@ import java.util.SortedMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,7 +44,6 @@ import org.apache.hadoop.hbase.RemoteExc
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.util.StringUtils;
 import org.cliffc.high_scale_lib.Counter;
 
@@ -61,7 +59,7 @@ import com.google.common.base.Preconditi
  * @see FlushRequester
  */
 @InterfaceAudience.Private
-class MemStoreFlusher implements FlushRequester {
+class MemStoreFlusher extends HasThread implements FlushRequester {
   static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
   // These two data members go together.  Any entry in the one must have
   // a corresponding entry in the other.
@@ -73,8 +71,8 @@ class MemStoreFlusher implements FlushRe
 
   private final long threadWakeFrequency;
   private final HRegionServer server;
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  private final Object blockSignal = new Object();
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition flushOccurred = lock.newCondition();
 
   protected final long globalMemStoreLimit;
   protected final long globalMemStoreLimitLowMark;
@@ -89,9 +87,6 @@ class MemStoreFlusher implements FlushRe
   private long blockingWaitTime;
   private final Counter updatesBlockedMsHighWater = new Counter();
 
-  private FlushHandler[] flushHandlers = null;
-  private int handlerCount;
-
   /**
    * @param conf
    * @param server
@@ -116,7 +111,6 @@ class MemStoreFlusher implements FlushRe
       conf.getInt("hbase.hstore.blockingStoreFiles", 7);
     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
       90000);
-    this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
     LOG.info("globalMemStoreLimit=" +
       StringUtils.humanReadableInt(this.globalMemStoreLimit) +
       ", globalMemStoreLimitLowMark=" +
@@ -219,59 +213,64 @@ class MemStoreFlusher implements FlushRe
     return true;
   }
 
-  private class FlushHandler extends HasThread {
-    @Override
-    public void run() {
-      while (!server.isStopped()) {
-        FlushQueueEntry fqe = null;
-        try {
-          wakeupPending.set(false); // allow someone to wake us up again
-          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-          if (fqe == null || fqe instanceof WakeupFlushThread) {
-            if (isAboveLowWaterMark()) {
-              LOG.debug("Flush thread woke up because memory above low water="
-                  + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
-              if (!flushOneForGlobalPressure()) {
-                // Wasn't able to flush any region, but we're above low water mark
-                // This is unlikely to happen, but might happen when closing the
-                // entire server - another thread is flushing regions. We'll just
-                // sleep a little bit to avoid spinning, and then pretend that
-                // we flushed one, so anyone blocked will check again
+  @Override
+  public void run() {
+    while (!this.server.isStopped()) {
+      FlushQueueEntry fqe = null;
+      try {
+        wakeupPending.set(false); // allow someone to wake us up again
+        fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+        if (fqe == null || fqe instanceof WakeupFlushThread) {
+          if (isAboveLowWaterMark()) {
+            LOG.debug("Flush thread woke up because memory above low water=" +
+              StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
+            if (!flushOneForGlobalPressure()) {
+              // Wasn't able to flush any region, but we're above low water mark
+              // This is unlikely to happen, but might happen when closing the
+              // entire server - another thread is flushing regions. We'll just
+              // sleep a little bit to avoid spinning, and then pretend that
+              // we flushed one, so anyone blocked will check again
+              lock.lock();
+              try {
                 Thread.sleep(1000);
-                wakeUpIfBlocking();
+                flushOccurred.signalAll();
+              } finally {
+                lock.unlock();
               }
-              // Enqueue another one of these tokens so we'll wake up again
-              wakeupFlushThread();
             }
-            continue;
+            // Enqueue another one of these tokens so we'll wake up again
+            wakeupFlushThread();
           }
-          FlushRegionEntry fre = (FlushRegionEntry) fqe;
-          if (!flushRegion(fre)) {
-            break;
-          }
-        } catch (InterruptedException ex) {
-          continue;
-        } catch (ConcurrentModificationException ex) {
           continue;
-        } catch (Exception ex) {
-          LOG.error("Cache flusher failed for entry " + fqe, ex);
-          if (!server.checkFileSystem()) {
-            break;
-          }
+        }
+        FlushRegionEntry fre = (FlushRegionEntry)fqe;
+        if (!flushRegion(fre)) {
+          break;
+        }
+      } catch (InterruptedException ex) {
+        continue;
+      } catch (ConcurrentModificationException ex) {
+        continue;
+      } catch (Exception ex) {
+        LOG.error("Cache flusher failed for entry " + fqe, ex);
+        if (!server.checkFileSystem()) {
+          break;
         }
       }
-      synchronized (regionsInQueue) {
-        regionsInQueue.clear();
-        flushQueue.clear();
-      }
+    }
+    this.regionsInQueue.clear();
+    this.flushQueue.clear();
 
-      // Signal anyone waiting, so they see the close flag
-      wakeUpIfBlocking();
-      LOG.info(getName() + " exiting");
+    // Signal anyone waiting, so they see the close flag
+    lock.lock();
+    try {
+      flushOccurred.signalAll();
+    } finally {
+      lock.unlock();
     }
+    LOG.info(getName() + " exiting");
   }
 
-
   private void wakeupFlushThread() {
     if (wakeupPending.compareAndSet(false, true)) {
       flushQueue.add(new WakeupFlushThread());
@@ -288,10 +287,6 @@ class MemStoreFlusher implements FlushRe
           continue;
         }
 
-        if (region.writestate.flushing || !region.writestate.writesEnabled) {
-          continue;
-        }
-
         if (checkStoreFileCount && isTooManyStoreFiles(region)) {
           continue;
         }
@@ -337,41 +332,11 @@ class MemStoreFlusher implements FlushRe
    * Only interrupt once it's done with a run through the work loop.
    */
   void interruptIfNecessary() {
-    lock.writeLock().lock();
+    lock.lock();
     try {
-      for (FlushHandler flushHander : flushHandlers) {
-        if (flushHander != null) flushHander.interrupt();
-      }
+      this.interrupt();
     } finally {
-      lock.writeLock().unlock();
-    }
-  }
-
-  synchronized void start(UncaughtExceptionHandler eh) {
-    ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
-        server.getServerName().toString() + "-MemStoreFlusher", eh);
-    flushHandlers = new FlushHandler[handlerCount];
-    for (int i = 0; i < flushHandlers.length; i++) {
-      flushHandlers[i] = new FlushHandler();
-      flusherThreadFactory.newThread(flushHandlers[i]);
-      flushHandlers[i].start();
-    }
-  }
-
-  boolean isAlive() {
-    for (FlushHandler flushHander : flushHandlers) {
-      if (flushHander != null && flushHander.isAlive()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  void join() {
-    for (FlushHandler flushHander : flushHandlers) {
-      if (flushHander != null) {
-        Threads.shutdown(flushHander.getThread());
-      }
+      lock.unlock();
     }
   }
 
@@ -400,8 +365,7 @@ class MemStoreFlusher implements FlushRe
             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
           if (!this.server.compactSplitThread.requestSplit(region)) {
             try {
-              this.server.compactSplitThread.requestCompaction(region, Thread
-                  .currentThread().getName());
+              this.server.compactSplitThread.requestCompaction(region, getName());
             } catch (IOException e) {
               LOG.error(
                 "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
@@ -440,8 +404,8 @@ class MemStoreFlusher implements FlushRe
         // emergencyFlush, then item was removed via a flushQueue.poll.
         flushQueue.remove(fqe);
      }
+     lock.lock();
     }
-    lock.readLock().lock();
     try {
       boolean shouldCompact = region.flushcache();
       // We just want to check the size
@@ -449,7 +413,7 @@ class MemStoreFlusher implements FlushRe
       if (shouldSplit) {
         this.server.compactSplitThread.requestSplit(region);
       } else if (shouldCompact) {
-        server.compactSplitThread.requestCompaction(region, Thread.currentThread().getName());
+        server.compactSplitThread.requestCompaction(region, getName());
       }
 
     } catch (DroppedSnapshotException ex) {
@@ -468,18 +432,15 @@ class MemStoreFlusher implements FlushRe
         return false;
       }
     } finally {
-      lock.readLock().unlock();
-      wakeUpIfBlocking();
+      try {
+        flushOccurred.signalAll();
+      } finally {
+        lock.unlock();
+      }
     }
     return true;
   }
 
-  private void wakeUpIfBlocking() {
-    synchronized (blockSignal) {
-      blockSignal.notifyAll();
-    }
-  }
-
   private boolean isTooManyStoreFiles(HRegion region) {
     for (Store hstore : region.stores.values()) {
       if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
@@ -497,12 +458,12 @@ class MemStoreFlusher implements FlushRe
    */
   public void reclaimMemStoreMemory() {
     if (isAboveHighWaterMark()) {
-      long start = System.currentTimeMillis();
-      synchronized (this.blockSignal) {
+      lock.lock();
+      try {
         boolean blocked = false;
         long startTime = 0;
         while (isAboveHighWaterMark() && !server.isStopped()) {
-          if (!blocked) {
+          if(!blocked){
             startTime = EnvironmentEdgeManager.currentTimeMillis();
             LOG.info("Blocking updates on " + server.toString() +
             ": the global memstore size " +
@@ -515,12 +476,10 @@ class MemStoreFlusher implements FlushRe
           try {
             // we should be able to wait forever, but we've seen a bug where
             // we miss a notify, so put a 5 second bound on it at least.
-            blockSignal.wait(5 * 1000);
+            flushOccurred.await(5, TimeUnit.SECONDS);
           } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
           }
-          long took = System.currentTimeMillis() - start;
-          LOG.warn("Memstore is above high water mark and block " + took + "ms");
         }
         if(blocked){
           final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
@@ -529,6 +488,8 @@ class MemStoreFlusher implements FlushRe
           }
           LOG.info("Unblocking updates for server " + server.toString());
         }
+      } finally {
+        lock.unlock();
       }
     } else if (isAboveLowWaterMark()) {
       wakeupFlushThread();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1437274&r1=1437273&r2=1437274&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Wed Jan 23 05:28:51 2013
@@ -42,9 +42,6 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -111,7 +108,7 @@ import org.apache.hadoop.util.StringUtil
 @InterfaceAudience.Private
 class FSHLog implements HLog, Syncable {
   static final Log LOG = LogFactory.getLog(FSHLog.class);
-
+  
   private final FileSystem fs;
   private final Path rootDir;
   private final Path dir;
@@ -132,7 +129,7 @@ class FSHLog implements HLog, Syncable {
   private WALCoprocessorHost coprocessorHost;
 
   private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer
-  // Minimum tolerable replicas, if the actual value is lower than it,
+  // Minimum tolerable replicas, if the actual value is lower than it, 
   // rollWriter will be triggered
   private int minTolerableReplication;
   private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
@@ -244,10 +241,10 @@ class FSHLog implements HLog, Syncable {
   public FSHLog(final FileSystem fs, final Path root, final String logDir,
                 final Configuration conf)
   throws IOException {
-    this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
+    this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, 
         conf, null, true, null, false);
   }
-
+  
   /**
    * Constructor.
    *
@@ -314,7 +311,7 @@ class FSHLog implements HLog, Syncable {
    * @throws IOException
    */
   public FSHLog(final FileSystem fs, final Path root, final String logDir,
-      final String oldLogDir, final Configuration conf,
+      final String oldLogDir, final Configuration conf, 
       final List<WALActionsListener> listeners,
       final boolean failIfLogDirExists, final String prefix, boolean forMeta)
   throws IOException {
@@ -325,15 +322,15 @@ class FSHLog implements HLog, Syncable {
     this.oldLogDir = new Path(this.rootDir, oldLogDir);
     this.forMeta = forMeta;
     this.conf = conf;
-
+   
     if (listeners != null) {
       for (WALActionsListener i: listeners) {
         registerWALActionsListener(i);
       }
     }
-
+    
     this.failIfLogDirExists = failIfLogDirExists;
-
+    
     this.blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
         getDefaultBlockSize());
     // Roll at 95% of block size.
@@ -341,7 +338,7 @@ class FSHLog implements HLog, Syncable {
     this.logrollsize = (long)(this.blocksize * multi);
     this.optionalFlushInterval =
       conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
-
+    
     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
     this.minTolerableReplication = conf.getInt(
         "hbase.regionserver.hlog.tolerable.lowreplication",
@@ -351,9 +348,9 @@ class FSHLog implements HLog, Syncable {
     this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
     this.closeErrorsTolerated = conf.getInt(
         "hbase.regionserver.logroll.errors.tolerated", 0);
-
+    
     this.logSyncerThread = new LogSyncer(this.optionalFlushInterval);
-
+    
     LOG.info("HLog configuration: blocksize=" +
       StringUtils.byteDesc(this.blocksize) +
       ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
@@ -378,7 +375,7 @@ class FSHLog implements HLog, Syncable {
     }
     // rollWriter sets this.hdfs_out if it can.
     rollWriter();
-
+    
     // handle the reflection necessary to call getNumCurrentReplicas()
     this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
 
@@ -388,7 +385,7 @@ class FSHLog implements HLog, Syncable {
 
     this.metrics = new MetricsWAL();
   }
-
+  
   // use reflection to search for getDefaultBlockSize(Path f)
   // if the method doesn't exist, fall back to using getDefaultBlockSize()
   private long getDefaultBlockSize() throws IOException {
@@ -481,7 +478,7 @@ class FSHLog implements HLog, Syncable {
    * @return The wrapped stream our writer is using; its not the
    * writer's 'out' FSDatoOutputStream but the stream that this 'out' wraps
    * (In hdfs its an instance of DFSDataOutputStream).
-   *
+   * 
    * usage: see TestLogRolling.java
    */
   OutputStream getOutputStream() {
@@ -572,7 +569,7 @@ class FSHLog implements HLog, Syncable {
   /**
    * This method allows subclasses to inject different writers without having to
    * extend other methods like rollWriter().
-   *
+   * 
    * @param fs
    * @param path
    * @param conf
@@ -945,7 +942,7 @@ class FSHLog implements HLog, Syncable {
       }
       // Sync if catalog region, and if not then check if that table supports
       // deferred log flushing
-      if (doSync &&
+      if (doSync && 
           (info.isMetaRegion() ||
           !htd.isDeferredLogFlush())) {
         // sync txn to file system
@@ -955,14 +952,14 @@ class FSHLog implements HLog, Syncable {
     }
 
   @Override
-  public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits,
+  public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, 
     UUID clusterId, final long now, HTableDescriptor htd)
     throws IOException {
     return append(info, tableName, edits, clusterId, now, htd, false);
   }
 
   @Override
-  public long append(HRegionInfo info, byte [] tableName, WALEdit edits,
+  public long append(HRegionInfo info, byte [] tableName, WALEdit edits, 
     UUID clusterId, final long now, HTableDescriptor htd)
     throws IOException {
     return append(info, tableName, edits, clusterId, now, htd, true);
@@ -984,8 +981,8 @@ class FSHLog implements HLog, Syncable {
 
     // List of pending writes to the HLog. There corresponds to transactions
     // that have not yet returned to the client. We keep them cached here
-    // instead of writing them to HDFS piecemeal, because the HDFS write
-    // method is pretty heavyweight as far as locking is concerned. The
+    // instead of writing them to HDFS piecemeal, because the HDFS write 
+    // method is pretty heavyweight as far as locking is concerned. The 
     // goal is to increase the batchsize for writing-to-hdfs as well as
     // sync-to-hdfs, so that we can get better system throughput.
     private List<Entry> pendingWrites = new LinkedList<Entry>();
@@ -1072,7 +1069,7 @@ class FSHLog implements HLog, Syncable {
       // See HBASE-4387, HBASE-5623, HBASE-7329.
       tempWriter = this.writer;
     }
-    // if the transaction that we are interested in is already
+    // if the transaction that we are interested in is already 
     // synced, then return immediately.
     if (txid <= this.syncedTillHere) {
       return;
@@ -1080,7 +1077,7 @@ class FSHLog implements HLog, Syncable {
     try {
       long doneUpto;
       long now = EnvironmentEdgeManager.currentTimeMillis();
-      // First flush all the pending writes to HDFS. Then
+      // First flush all the pending writes to HDFS. Then 
       // issue the sync to HDFS. If sync is successful, then update
       // syncedTillHere to indicate that transactions till this
       // number has been successfully synced.
@@ -1347,13 +1344,13 @@ class FSHLog implements HLog, Syncable {
 
   /**
    * Get the directory we are making logs in.
-   *
+   * 
    * @return dir
    */
   protected Path getDir() {
     return dir;
   }
-
+  
   static Path getHLogArchivePath(Path oldLogDir, Path p) {
     return new Path(oldLogDir, p.getName());
   }
@@ -1391,7 +1388,7 @@ class FSHLog implements HLog, Syncable {
         conf, baseDir, p, oldLogDir, fs);
     logSplitter.splitLog();
   }
-
+  
   @Override
   public WALCoprocessorHost getCoprocessorHost() {
     return coprocessorHost;