You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2009/10/21 02:38:25 UTC

svn commit: r827858 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/regionserver/ src/java/org/apache/hadoop/hbase/regionserver/wal/ src/test/org/apache/hadoop/hbase/regionserver/wal/

Author: rawson
Date: Wed Oct 21 00:38:25 2009
New Revision: 827858

URL: http://svn.apache.org/viewvc?rev=827858&view=rev
Log:
hdfs-1915

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HConstants.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=827858&r1=827857&r2=827858&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Wed Oct 21 00:38:25 2009
@@ -63,7 +63,7 @@
                Filters client-side
    HBASE-1890  hbase-1506 where assignment is done at regionserver doesn't
                 work
-   HBASE-1889  ClassNotFoundException on trunk for REST
+   HBASE-1889 ClassNotFoundException on trunk for REST
    HBASE-1905  Remove unused config. hbase.hstore.blockCache.blockSize
    HBASE-1906  FilterList of prefix and columnvalue not working properly with
                deletes and multiple values
@@ -73,6 +73,8 @@
                Purtell)
    HBASE-1916  FindBugs and javac warnings cleanup
    HBASE-1908  ROOT not reassigned if only one regionserver left
+   HBASE-1915  HLog.sync is called way too often, needs to be only called 1x per
+               RPC
 
   IMPROVEMENTS
    HBASE-1760  Cleanup TODOs in HTable

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HConstants.java?rev=827858&r1=827857&r2=827858&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HConstants.java Wed Oct 21 00:38:25 2009
@@ -223,7 +223,7 @@
   /**
    * Max length a row can have because of the limitation in TFile.
    */
-  static final int MAX_ROW_LENGTH = 1024*64;
+  static final int MAX_ROW_LENGTH = Short.MAX_VALUE;
   
   /** When we encode strings, we always specify UTF8 encoding */
   static final String UTF8_ENCODING = "UTF-8";

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=827858&r1=827857&r2=827858&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Oct 21 00:38:25 2009
@@ -1760,6 +1760,8 @@
     try {
       cacheFlusher.reclaimMemStoreMemory();
       region.put(put, getLockFromId(put.getLockId()));
+
+      this.hlog.sync(region.getRegionInfo().isMetaRegion());
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
@@ -1770,8 +1772,11 @@
     // Count of Puts processed.
     int i = 0;
     checkOpen();
+    boolean isMetaRegion = false;
     try {
       HRegion region = getRegion(regionName);
+      isMetaRegion = region.getRegionInfo().isMetaRegion();
+      
       this.cacheFlusher.reclaimMemStoreMemory();
       Integer[] locks = new Integer[puts.length];
       for (i = 0; i < puts.length; i++) {
@@ -1779,16 +1784,22 @@
         locks[i] = getLockFromId(puts[i].getLockId());
         region.put(puts[i], locks[i]);
       }
+
     } catch (WrongRegionException ex) {
       LOG.debug("Batch puts: " + i, ex);
-      return i;
     } catch (NotServingRegionException ex) {
-      return i;
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
     // All have been processed successfully.
-    return -1;
+
+    this.hlog.sync(isMetaRegion);
+
+    if (i == puts.length) {
+      return -1;
+    } else {
+      return i;
+    }
   }
 
   /**
@@ -1814,8 +1825,11 @@
     HRegion region = getRegion(regionName);
     try {
       cacheFlusher.reclaimMemStoreMemory();
-      return region.checkAndPut(row, family, qualifier, value, put,
+      boolean retval = region.checkAndPut(row, family, qualifier, value, put,
           getLockFromId(put.getLockId()), true);
+
+      this.hlog.sync(region.getRegionInfo().isMetaRegion());
+      return retval;
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
@@ -1962,8 +1976,9 @@
       Integer lid = getLockFromId(delete.getLockId());
       HRegion region = getRegion(regionName);
       region.delete(delete, lid, writeToWAL);
-    } catch(WrongRegionException ex) {
-      // ignore
+
+      this.hlog.sync(region.getRegionInfo().isMetaRegion());
+    } catch (WrongRegionException ex) {
     } catch (NotServingRegionException ex) {
       // ignore
     } catch (Throwable t) {
@@ -1976,11 +1991,14 @@
     // Count of Deletes processed.
     int i = 0;
     checkOpen();
+    boolean isMetaRegion = false;
     try {
       boolean writeToWAL = true;
       this.cacheFlusher.reclaimMemStoreMemory();
       Integer[] locks = new Integer[deletes.length];
       HRegion region = getRegion(regionName);
+      isMetaRegion = region.getRegionInfo().isMetaRegion();
+
       for (i = 0; i < deletes.length; i++) {
         this.requestCount.incrementAndGet();
         locks[i] = getLockFromId(deletes[i].getLockId());
@@ -1994,6 +2012,8 @@
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
+
+    this.hlog.sync(isMetaRegion);
     // All have been processed successfully.
     return -1;
   }
@@ -2438,8 +2458,12 @@
     requestCount.incrementAndGet();
     try {
       HRegion region = getRegion(regionName);
-      return region.incrementColumnValue(row, family, qualifier, amount, 
+      long retval = region.incrementColumnValue(row, family, qualifier, amount,
           writeToWAL);
+
+      this.hlog.sync(region.getRegionInfo().isMetaRegion());
+
+      return retval;
     } catch (IOException e) {
       checkFileSystem();
       throw e;

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java?rev=827858&r1=827857&r2=827858&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java Wed Oct 21 00:38:25 2009
@@ -22,6 +22,7 @@
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -51,7 +52,11 @@
   protected void chore() {
     HLog hlog = log.get();
     if (hlog != null) {
-      hlog.optionalSync();
+      try {
+        hlog.sync(true); // force a flush
+      } catch (IOException e) {
+        LOG.error("LogFlusher got exception while syncing: " + e);
+      }
     }
   }
 }
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=827858&r1=827857&r2=827858&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Wed Oct 21 00:38:25 2009
@@ -354,7 +354,6 @@
         }
         this.numEntries.set(0);
         this.editsSize.set(0);
-        updateLock.notifyAll();
       }
     } finally {
       this.cacheFlushLock.unlock();
@@ -600,7 +599,6 @@
           LOG.debug("closing hlog writer in " + this.dir.toString());
         }
         this.writer.close();
-        updateLock.notifyAll();
       }
     } finally {
       cacheFlushLock.unlock();
@@ -657,8 +655,9 @@
       this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
       boolean sync = regionInfo.isMetaRegion() || regionInfo.isRootRegion();
       doWrite(logKey, logEdit, sync, logKey.getWriteTime());
+
+      this.unflushedEntries.incrementAndGet();
       this.numEntries.incrementAndGet();
-      updateLock.notifyAll();
     }
     if (this.editsSize.get() > this.logrollsize) {
       if (listener != null) {
@@ -710,7 +709,9 @@
         doWrite(logKey, kv, sync, now);
         this.numEntries.incrementAndGet();
       }
-      updateLock.notifyAll();
+
+      // Only count 1 row as an unflushed entry.
+      this.unflushedEntries.incrementAndGet();
     }
     if (this.editsSize.get() > this.logrollsize) {
         requestLogRoll();
@@ -718,40 +719,45 @@
   }
 
   public void sync() throws IOException {
-    lastLogFlushTime = System.currentTimeMillis();
-    if (this.append && syncfs != null) {
-      try {
-        this.syncfs.invoke(this.writer, NO_ARGS);
-      } catch (Exception e) {
-        throw new IOException("Reflection", e);
-      }
-    } else {
-      this.writer.sync();
-      // Above is sequencefile.writer sync.  It doesn't actually synce the
-      // backing stream.  Need to do the below to do that.
-      if (this.writer_out != null) this.writer_out.sync();
-    }
-    this.unflushedEntries.set(0);
-  }
-
-  public void optionalSync() {
-    if (!this.closed) {
-      long now = System.currentTimeMillis();
-      synchronized (updateLock) {
-        if (((now - this.optionalFlushInterval) > this.lastLogFlushTime) &&
-            this.unflushedEntries.get() > 0) {
-          try {
-            sync();
-          } catch (IOException e) {
-            LOG.error("Error flushing hlog", e);
+    sync(false);
+  }
+
+  /**
+   * Multiple threads will call sync() at the same time, only the winner
+   * will actually flush if there is any race or build up.
+   *
+   * @param force sync regardless (for meta updates) if there is data
+   * @throws IOException
+   */
+  public void sync(boolean force) throws IOException {
+    synchronized (this.updateLock) {
+      if (this.closed)
+        return;
+
+      if (this.unflushedEntries.get() == 0)
+        return; // win
+
+      if (force || this.unflushedEntries.get() > this.flushlogentries) {
+        try {
+          lastLogFlushTime = System.currentTimeMillis();
+          if (this.append && syncfs != null) {
+            try {
+              this.syncfs.invoke(this.writer, NO_ARGS);
+            } catch (Exception e) {
+              throw new IOException("Reflection", e);
+            }
+          } else {
+            this.writer.sync();
+            if (this.writer_out != null)
+              this.writer_out.sync();
           }
+          this.unflushedEntries.set(0);
+        } catch (IOException e) {
+          LOG.fatal("Could not append. Requesting close of hlog", e);
+          requestLogRoll();
+          throw e;
         }
       }
-      long took = System.currentTimeMillis() - now;
-      if (took > 1000) {
-        LOG.warn(Thread.currentThread().getName() + " took " + took +
-          "ms optional sync'ing hlog; editcount=" + this.numEntries.get());
-      }
     }
   }
 
@@ -770,9 +776,6 @@
     try {
       this.editsSize.addAndGet(logKey.heapSize() + logEdit.heapSize());
       this.writer.append(logKey, logEdit);
-      if (sync || this.unflushedEntries.incrementAndGet() >= flushlogentries) {
-        sync();
-      }
       long took = System.currentTimeMillis() - now;
       if (took > 1000) {
         LOG.warn(Thread.currentThread().getName() + " took " + took +
@@ -858,7 +861,6 @@
         if (seq != null && logSeqId >= seq.longValue()) {
           this.lastSeqWritten.remove(regionName);
         }
-        updateLock.notifyAll();
       }
     } finally {
       this.cacheFlushLock.unlock();

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=827858&r1=827857&r2=827858&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Wed Oct 21 00:38:25 2009
@@ -48,6 +48,7 @@
     this.conf.setBoolean("dfs.support.append", true);
     // Make block sizes small.
     this.conf.setInt("dfs.blocksize", 1024 * 1024);
+    this.conf.setInt("hbase.regionserver.flushlogentries", 1);
     cluster = new MiniDFSCluster(conf, 3, true, (String[])null);
     // Set the hbase.rootdir to be the home directory in mini dfs.
     this.conf.set(HConstants.HBASE_DIR,
@@ -125,8 +126,6 @@
     assertEquals(bytes.length, read);
     out.close();
     in.close();
-    // To be sure, set our flush to be at 100 edits.
-    this.conf.setInt("hbase.regionserver.flushlogentries", 100);
     Path subdir = new Path(this.dir, "hlogdir");
     HLog wal = new HLog(this.fs, subdir, this.conf, null);
     final int total = 20;