You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by dd...@apache.org on 2013/04/25 19:50:18 UTC

svn commit: r1475872 - in /hbase/trunk: hbase-common/src/main/resources/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ hbase-server/src/test/java/org/apache/hadoop/...

Author: ddas
Date: Thu Apr 25 17:50:17 2013
New Revision: 1475872

URL: http://svn.apache.org/r1475872
Log:
HBASE-5930. Limits the amount of time an edit can live in the memstore.

Modified:
    hbase/trunk/hbase-common/src/main/resources/hbase-default.xml
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java

Modified: hbase/trunk/hbase-common/src/main/resources/hbase-default.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/resources/hbase-default.xml?rev=1475872&r1=1475871&r2=1475872&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/resources/hbase-default.xml (original)
+++ hbase/trunk/hbase-common/src/main/resources/hbase-default.xml Thu Apr 25 17:50:17 2013
@@ -343,6 +343,14 @@
     </description>
   </property>
   <property>
+    <name>hbase.regionserver.optionalcacheflushinterval</name>
+    <value>3600000</value>
+    <description>
+    Amount of time to wait since the last time a region was flushed before
+    invoking an optional cache flush. Default 1 hour.
+    </description>
+  </property>
+  <property>
     <name>hbase.hregion.memstore.flush.size</name>
     <value>134217728</value>
     <description>

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java?rev=1475872&r1=1475871&r2=1475872&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java Thu Apr 25 17:50:17 2013
@@ -32,4 +32,11 @@ public interface FlushRequester {
    * @param region the HRegion requesting the cache flush
    */
   void requestFlush(HRegion region);
+  /**
+   * Tell the listener the cache needs to be flushed after a delay
+   *
+   * @param region the HRegion requesting the cache flush
+   * @param delay after how much time should the flush happen
+   */
+  void requestDelayedFlush(HRegion region, long delay);
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1475872&r1=1475871&r2=1475872&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Apr 25 17:50:17 2013
@@ -347,6 +347,7 @@ public class HRegion implements HeapSize
   final RegionServerServices rsServices;
   private RegionServerAccounting rsAccounting;
   private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
+  private long flushCheckInterval;
   private long blockingMemStoreSize;
   final long threadWakeFrequency;
   // Used to guard closes
@@ -438,6 +439,8 @@ public class HRegion implements HeapSize
       .add(confParam)
       .addStringMap(htd.getConfiguration())
       .addWritableMap(htd.getValues());
+    this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
+        DEFAULT_CACHE_FLUSH_INTERVAL);
     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
                     DEFAULT_ROWLOCK_WAIT_DURATION);
 
@@ -835,6 +838,12 @@ public class HRegion implements HeapSize
 
   private final Object closeLock = new Object();
 
+  /** Conf key for the periodic flush interval */
+  public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = 
+      "hbase.regionserver.optionalcacheflushinterval";
+  /** Default interval for the memstore flush */
+  public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
+
   /**
    * Close down this HRegion.  Flush the cache unless abort parameter is true,
    * Shut down each HStore, don't service any more calls.
@@ -1323,6 +1332,26 @@ public class HRegion implements HeapSize
   }
 
   /**
+   * Should the memstore be flushed now
+   */
+  boolean shouldFlush() {
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    //if we flushed in the recent past, we don't need to do again now
+    if ((now - getLastFlushTime() < flushCheckInterval)) {
+      return false;
+    }
+    //since we didn't flush in the recent past, flush now if certain conditions
+    //are met. Return true on first such memstore hit.
+    for (Store s : this.getStores().values()) {
+      if (s.timeOfOldestEdit() < now - flushCheckInterval) {
+        // we have an old enough edit in the memstore, flush
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * Flush the memstore.
    *
    * Flushing the memstore is a little tricky. We have a lot of updates in the
@@ -4898,7 +4927,7 @@ public class HRegion implements HeapSize
       ClassSize.OBJECT +
       ClassSize.ARRAY +
       38 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
-      (10 * Bytes.SIZEOF_LONG) +
+      (11 * Bytes.SIZEOF_LONG) +
       Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1475872&r1=1475871&r2=1475872&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Apr 25 17:50:17 2013
@@ -235,7 +235,7 @@ public class HRegionServer implements Cl
 
   public static final Log LOG = LogFactory.getLog(HRegionServer.class);
 
-  private final Random rand = new Random();
+  private final Random rand;
 
   /*
    * Strings to be used in forming the exception message for
@@ -357,6 +357,11 @@ public class HRegionServer implements Cl
    */
   Chore compactionChecker;
 
+  /*
+   * Check for flushes
+   */
+  Chore periodicFlusher;
+
   // HLog and HLog roller. log is protected rather than private to avoid
   // eclipse warning when accessed by inner classes
   protected volatile HLog hlog;
@@ -502,6 +507,7 @@ public class HRegionServer implements Cl
       throw new IllegalArgumentException("Failed resolve of " + initialIsa);
     }
 
+    this.rand = new Random(initialIsa.hashCode());
     this.rpcServer = HBaseServerRPC.getServer(AdminProtocol.class, this,
         new Class<?>[]{ClientProtocol.class,
             AdminProtocol.class, HBaseRPCErrorHandler.class,
@@ -682,6 +688,7 @@ public class HRegionServer implements Cl
       ".multiplier", 1000);
     this.compactionChecker = new CompactionChecker(this,
       this.threadWakeFrequency * multiplier, this);
+    this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
     // Health checker thread.
     int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
       HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
@@ -1274,6 +1281,36 @@ public class HRegionServer implements Cl
     }
   }
 
+  class PeriodicMemstoreFlusher extends Chore {
+    final HRegionServer server;
+    final static int RANGE_OF_DELAY = 20000; //millisec
+    final static int MIN_DELAY_TIME = 3000; //millisec
+    public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
+      super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server);
+      this.server = server;
+    }
+
+    @Override
+    protected void chore() {
+      for (HRegion r : this.server.onlineRegions.values()) {
+        if (r == null)
+          continue;
+        if (r.shouldFlush()) {
+          FlushRequester requester = server.getFlushRequester();
+          if (requester != null) {
+            long randomDelay = rand.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
+            LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() + 
+                " after a delay of " + randomDelay);
+            //Throttle the flushes by putting a delay. If we don't throttle, and there
+            //is a balanced write-load on the regions in a table, we might end up 
+            //overwhelming the filesystem with too many flushes at once.
+            requester.requestDelayedFlush(r, randomDelay);
+          }
+        }
+      }
+    }
+  }
+
   /**
    * Report the status of the server. A server is online once all the startup is
    * completed (setting up filesystem, starting service threads, etc.). This
@@ -1419,6 +1456,8 @@ public class HRegionServer implements Cl
     this.cacheFlusher.start(uncaughtExceptionHandler);
     Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
       ".compactionChecker", uncaughtExceptionHandler);
+    Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), n +
+        ".periodicFlusher", uncaughtExceptionHandler);
     if (this.healthCheckChore != null) {
     Threads
         .setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker",
@@ -1499,7 +1538,8 @@ public class HRegionServer implements Cl
     // Verify that all threads are alive
     if (!(leases.isAlive()
         && cacheFlusher.isAlive() && hlogRoller.isAlive()
-        && this.compactionChecker.isAlive())) {
+        && this.compactionChecker.isAlive())
+        && this.periodicFlusher.isAlive()) {
       stop("One or more threads are no longer alive -- stop");
       return false;
     }
@@ -1662,6 +1702,7 @@ public class HRegionServer implements Cl
    */
   protected void join() {
     Threads.shutdown(this.compactionChecker.getThread());
+    Threads.shutdown(this.periodicFlusher.getThread());
     this.cacheFlusher.join();
     if (this.healthCheckChore != null) {
       Threads.shutdown(this.healthCheckChore.getThread());

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1475872&r1=1475871&r2=1475872&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Thu Apr 25 17:50:17 2013
@@ -446,6 +446,11 @@ public class HStore implements Store {
     }
   }
 
+  @Override
+  public long timeOfOldestEdit() {
+    return memstore.timeOfOldestEdit();
+  }
+
   /**
    * Adds a value to the memstore
    *

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1475872&r1=1475871&r2=1475872&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Thu Apr 25 17:50:17 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.io.HeapSi
 import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
  * The MemStore holds in-memory modifications to the Store.  Modifications
@@ -90,6 +91,9 @@ public class MemStore implements HeapSiz
   // Used to track own heapSize
   final AtomicLong size;
 
+  // Used to track when to flush
+  volatile long timeOfOldestEdit = Long.MAX_VALUE;
+
   TimeRangeTracker timeRangeTracker;
   TimeRangeTracker snapshotTimeRangeTracker;
 
@@ -166,6 +170,7 @@ public class MemStore implements HeapSiz
           if (allocator != null) {
             this.allocator = new MemStoreLAB(conf, chunkPool);
           }
+          timeOfOldestEdit = Long.MAX_VALUE;
         }
       }
     } finally {
@@ -233,6 +238,28 @@ public class MemStore implements HeapSiz
     }
   }
 
+  long timeOfOldestEdit() {
+    return timeOfOldestEdit;
+  }
+
+  private boolean addToKVSet(KeyValue e) {
+    boolean b = this.kvset.add(e);
+    setOldestEditTimeToNow();
+    return b;
+  }
+
+  private boolean removeFromKVSet(KeyValue e) {
+    boolean b = this.kvset.remove(e);
+    setOldestEditTimeToNow();
+    return b;
+  }
+
+  void setOldestEditTimeToNow() {
+    if (timeOfOldestEdit == Long.MAX_VALUE) {
+      timeOfOldestEdit = EnvironmentEdgeManager.currentTimeMillis();
+    }
+  }
+
   /**
    * Internal version of add() that doesn't clone KVs with the
    * allocator, and doesn't take the lock.
@@ -240,7 +267,7 @@ public class MemStore implements HeapSiz
    * Callers should ensure they already have the read lock taken
    */
   private long internalAdd(final KeyValue toAdd) {
-    long s = heapSizeChange(toAdd, this.kvset.add(toAdd));
+    long s = heapSizeChange(toAdd, addToKVSet(toAdd));
     timeRangeTracker.includeTimestamp(toAdd);
     this.size.addAndGet(s);
     return s;
@@ -288,7 +315,7 @@ public class MemStore implements HeapSiz
       // If the key is in the memstore, delete it. Update this.size.
       found = this.kvset.get(kv);
       if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
-        this.kvset.remove(kv);
+        removeFromKVSet(kv);
         long s = heapSizeChange(kv, true);
         this.size.addAndGet(-s);
       }
@@ -307,7 +334,7 @@ public class MemStore implements HeapSiz
     this.lock.readLock().lock();
     try {
       KeyValue toAdd = maybeCloneWithAllocator(delete);
-      s += heapSizeChange(toAdd, this.kvset.add(toAdd));
+      s += heapSizeChange(toAdd, addToKVSet(toAdd));
       timeRangeTracker.includeTimestamp(toAdd);
     } finally {
       this.lock.readLock().unlock();
@@ -606,6 +633,7 @@ public class MemStore implements HeapSiz
             addedSize -= delta;
             this.size.addAndGet(-delta);
             it.remove();
+            setOldestEditTimeToNow();
           } else {
             versionsVisible++;
           }
@@ -941,7 +969,7 @@ public class MemStore implements HeapSiz
   }
 
   public final static long FIXED_OVERHEAD = ClassSize.align(
-      ClassSize.OBJECT + (13 * ClassSize.REFERENCE));
+      ClassSize.OBJECT + (13 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
 
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1475872&r1=1475871&r2=1475872&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Thu Apr 25 17:50:17 2013
@@ -326,6 +326,18 @@ class MemStoreFlusher implements FlushRe
     }
   }
 
+  public void requestDelayedFlush(HRegion r, long delay) {
+    synchronized (regionsInQueue) {
+      if (!regionsInQueue.containsKey(r)) {
+        // This entry has some delay
+        FlushRegionEntry fqe = new FlushRegionEntry(r);
+        fqe.requeue(delay);
+        this.regionsInQueue.put(r, fqe);
+        this.flushQueue.add(fqe);
+      }
+    }
+  }
+
   public int getFlushQueueSize() {
     return flushQueue.size();
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1475872&r1=1475871&r2=1475872&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Apr 25 17:50:17 2013
@@ -117,6 +117,11 @@ public interface Store extends HeapSize,
   public long add(KeyValue kv);
 
   /**
+   * When was the last edit done in the memstore
+   */
+  long timeOfOldestEdit();
+
+  /**
    * Removes a kv from the memstore. The KeyValue is removed only if its key & memstoreTS match the
    * key & memstoreTS value of the kv parameter.
    * @param kv

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1475872&r1=1475871&r2=1475872&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Thu Apr 25 17:50:17 2013
@@ -872,6 +872,12 @@ public class TestWALReplay {
         throw new RuntimeException("Exception flushing", e);
       }
     }
+
+    @Override
+    public void requestDelayedFlush(HRegion region, long when) {
+      // TODO Auto-generated method stub
+      
+    }
   }
 
   private void addWALEdits (final byte [] tableName, final HRegionInfo hri,

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java?rev=1475872&r1=1475871&r2=1475872&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java Thu Apr 25 17:50:17 2013
@@ -1,19 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
 package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;