You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/07/11 01:18:16 UTC

svn commit: r793145 - in /hadoop/hbase/trunk: CHANGES.txt src/java/org/apache/hadoop/hbase/regionserver/HLog.java

Author: stack
Date: Fri Jul 10 23:18:15 2009
New Revision: 793145

URL: http://svn.apache.org/viewvc?rev=793145&view=rev
Log:
HBASE-1470 hbase and HADOOP-4379, dhruba's flush/sync

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=793145&r1=793144&r2=793145&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Fri Jul 10 23:18:15 2009
@@ -468,6 +468,7 @@
    HBASE-1643  ScanDeleteTracker takes comparator but it unused
    HBASE-1603  MR failed "RetriesExhaustedException: Trying to contact region server
                Some server for region TestTable..." -- deubugging
+   HBASE-1470  hbase and HADOOP-4379, dhruba's flush/sync
 
   OPTIMIZATIONS
    HBASE-1412  Change values for delete column and column family in KeyValue

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=793145&r1=793144&r2=793145&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Fri Jul 10 23:18:15 2009
@@ -23,6 +23,7 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -63,6 +64,7 @@
 import org.apache.hadoop.io.SequenceFile.Metadata;
 import org.apache.hadoop.io.SequenceFile.Reader;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.fs.FSDataOutputStream;
 
 /**
  * HLog stores all the edits to the HStore.
@@ -112,7 +114,10 @@
   private final int flushlogentries;
   private final AtomicInteger unflushedEntries = new AtomicInteger(0);
   private volatile long lastLogFlushTime;
-  
+  private final boolean append;
+  private final Method syncfs;
+  private final static Object [] NO_ARGS = new Object []{};
+
   /*
    * Current log file.
    */
@@ -213,6 +218,21 @@
       ", flushlogentries=" + this.flushlogentries +
       ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
     rollWriter();
+    // Test if syncfs is available.
+    this.append = conf.getBoolean("dfs.support.append", false);
+    Method m = null;
+    if (this.append) {
+      try {
+        m = this.writer.getClass().getMethod("syncFs", new Class<?> []{});
+        LOG.debug("Using syncFs--hadoop-4379");
+      } catch (SecurityException e) {
+        throw new IOException("Failed test for syncfs", e);
+      } catch (NoSuchMethodException e) {
+        // This can happen
+        LOG.info("syncFs--hadoop-4379 not available" );
+      }
+    }
+    this.syncfs = m;
   }
 
   /**
@@ -585,7 +605,15 @@
 
   public void sync() throws IOException {
     lastLogFlushTime = System.currentTimeMillis();
-    this.writer.sync();
+    if (this.append && syncfs != null) {
+      try {
+        this.syncfs.invoke(this.writer, NO_ARGS);
+      } catch (Exception e) {
+        throw new IOException("Reflection", e);
+      }
+    } else {
+      this.writer.sync();
+    }
     this.unflushedEntries.set(0);
   }
 
@@ -821,15 +849,16 @@
           step * DEFAULT_NUMBER_CONCURRENT_LOG_READS +
           DEFAULT_NUMBER_CONCURRENT_LOG_READS;
         for (int i = (step * 10); i < endIndex; i++) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
-                ": " + logfiles[i].getPath() + 
-                ", length=" + logfiles[i].getLen());
-          }
           // Check for possibly empty file. With appends, currently Hadoop 
           // reports a zero length even if the file has been sync'd. Revisit if
           // HADOOP-4751 is committed.
           long length = logfiles[i].getLen();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
+              ": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
+          }
+          boolean append = conf.getBoolean("dfs.support.append", false);
+          recoverLog(fs, logfiles[i].getPath(), append);
           SequenceFile.Reader in = null;
           int count = 0;
           try {
@@ -853,10 +882,10 @@
                 key = new HLogKey();
                 val = new KeyValue();
               }
-              LOG.debug("Pushed " + count + " entries from " +
+              LOG.debug("Pushed=" + count + " entries from " +
                 logfiles[i].getPath());
             } catch (IOException e) {
-              LOG.debug("IOE Pushed " + count + " entries from " +
+              LOG.debug("IOE Pushed=" + count + " entries from " +
                 logfiles[i].getPath());
               e = RemoteExceptionHandler.checkIOException(e);
               if (!(e instanceof EOFException)) {
@@ -866,7 +895,7 @@
             }
           } catch (IOException e) {
             if (length <= 0) {
-              LOG.warn("Empty hlog, continuing: " + logfiles[i]);
+              LOG.warn("Empty hlog, continuing: " + logfiles[i] + " count=" + count, e);
               continue;
             }
             throw e;
@@ -943,9 +972,6 @@
                       fs.delete(oldlogfile, true);
                     }
                   }
-                  if (wap == null) {
-                    throw new NullPointerException();
-                  }
                   wap.w.append(logEntry.getKey(), logEntry.getEdit());
                   count++;
                 }
@@ -1030,6 +1056,40 @@
   public static String getHLogDirectoryName(HServerInfo info) {
     return getHLogDirectoryName(HServerInfo.getServerName(info));
   }
+
+  /*
+   * Recover log.
+   * If append has been set, try and open log in append mode.
+   * Doing this, we get a hold of the file that crashed writer
+   * was writing to.  Once we have it, close it.  This will
+   * allow subsequent reader to see up to last sync.
+   * @param fs
+   * @param p
+   * @param append
+   */
+  private static void recoverLog(final FileSystem fs, final Path p,
+      final boolean append) {
+    if (!append) {
+      return;
+    }
+    // Trying recovery
+    boolean recovered = false;
+    while (!recovered) {
+      try {
+        FSDataOutputStream out = fs.append(p);
+        out.close();
+        recovered = true;
+      } catch (IOException e) {
+        LOG.info("Failed open for append, waiting on lease recovery: " + p, e);
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ex) {
+          // ignore it and try again
+        }
+      }
+    }
+    LOG.info("Past out lease recovery");
+  }
   
   /**
    * Construct the HLog directory name