You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/08/25 08:57:30 UTC

svn commit: r1377225 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/regionser...

Author: mbautin
Date: Sat Aug 25 06:57:29 2012
New Revision: 1377225

URL: http://svn.apache.org/viewvc?rev=1377225&view=rev
Log:
[HBASE-6508] [89-fb] Filter out HLog edits at log splitting time

Author: avf

Summary:
Filter out edits that have already been flushed from MemStore to HFiles for a specific region. To do so, we:

1) Keep track of the last sequence id flushed for each of server's regions for in HServerInfo

2) Keep track of sequence ids for all region servers in HMaster

3) Create a method call in HMaseRegionInterface that region servers can use to retrieve latest flushed sequence id for a given region

4) Pass HMasterRegionInterface objects to HLogSplitter / HLogSplitWorker

Test Plan: Unit tests (added unit test TestHLogFiltering)

Reviewers: Liyin, aaiyer, Kannan

Reviewed By: Liyin

CC: khemani, JIRA, tedyu

Differential Revision: https://reviews.facebook.net/D4509

Added:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HServerInfo.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/CodeToClassAndBack.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseMapWritable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HServerInfo.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HServerInfo.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HServerInfo.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HServerInfo.java Sat Aug 25 06:57:29 2012
@@ -25,10 +25,14 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Set;
 import java.util.regex.Pattern;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -69,6 +73,11 @@ public class HServerInfo implements Writ
   private String hostname;
   private String cachedHostnamePort = null;
 
+  // For each region, store the last sequence id that was flushed
+  // from MemStore to an HFile
+  private final SortedMap<byte[], Long> flushedSequenceIdByRegion =
+      new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+
   public HServerInfo() {
     this(new HServerAddress(), 0, "default name");
   }
@@ -110,6 +119,7 @@ public class HServerInfo implements Writ
     this.startCode = other.getStartCode();
     this.load = other.getLoad();
     this.hostname = other.hostname;
+    this.flushedSequenceIdByRegion.putAll(other.flushedSequenceIdByRegion);
   }
 
   public HServerLoad getLoad() {
@@ -137,6 +147,18 @@ public class HServerInfo implements Writ
     return this.hostname;
   }
 
+  public void setFlushedSequenceIdForRegion(byte[] region, long sequenceId) {
+    flushedSequenceIdByRegion.put(region, sequenceId);
+  }
+
+  public long getFlushedSequenceIdForRegion(byte[] region) {
+    return flushedSequenceIdByRegion.get(region);
+  }
+
+  public SortedMap<byte[], Long> getFlushedSequenceIdByRegion() {
+    return flushedSequenceIdByRegion;
+  }
+
   /**
    * @return The hostname and port concatenated with a ':' as separator.
    */
@@ -238,6 +260,9 @@ public class HServerInfo implements Writ
     this.load.readFields(in);
     in.readInt();
     this.hostname = in.readUTF();
+    HbaseMapWritable<byte[], Long> sequenceIdsWritable =
+        new HbaseMapWritable<byte[], Long>(flushedSequenceIdByRegion);
+    sequenceIdsWritable.readFields(in);
   }
 
   public void write(DataOutput out) throws IOException {
@@ -247,6 +272,9 @@ public class HServerInfo implements Writ
     // Still serializing the info port for backward compatibility but it is not used.
     out.writeInt(HConstants.DEFAULT_REGIONSERVER_INFOPORT);
     out.writeUTF(hostname);
+    HbaseMapWritable<byte[], Long> sequenceIdsWritable =
+      new HbaseMapWritable<byte[], Long>(flushedSequenceIdByRegion);
+    sequenceIdsWritable.write(out);
   }
 
   public int compareTo(HServerInfo o) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/CodeToClassAndBack.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/CodeToClassAndBack.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/CodeToClassAndBack.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/CodeToClassAndBack.java Sat Aug 25 06:57:29 2012
@@ -45,7 +45,7 @@ public interface CodeToClassAndBack {
   /**
    * Class list for supported classes
    */
-  public Class<?>[] classList = {byte[].class};
+  public Class<?>[] classList = {byte[].class, Long.class};
 
   /**
    * The static loader that is used instead of the static constructor in

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseMapWritable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseMapWritable.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseMapWritable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseMapWritable.java Sat Aug 25 06:57:29 2012
@@ -187,6 +187,8 @@ implements SortedMap<byte[],V>, Configur
       Object value = e.getValue();
       if (value instanceof byte []) {
         Bytes.writeByteArray(out, (byte [])value);
+      } else if (value instanceof Long) {
+        out.writeLong((Long)value);
       } else {
         ((Writable)value).write(out);
       }
@@ -209,6 +211,9 @@ implements SortedMap<byte[],V>, Configur
       if (clazz.equals(byte [].class)) {
         byte [] bytes = Bytes.readByteArray(in);
         value = (V)bytes;
+      } else if (clazz.equals(Long.class)) {
+        Long val = in.readLong();
+        value = (V)val;
       } else {
         Writable w = (Writable)ReflectionUtils.
           newInstance(clazz, getConf());

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java Sat Aug 25 06:57:29 2012
@@ -61,4 +61,18 @@ public interface HMasterRegionInterface 
   public HMsg[] regionServerReport(HServerInfo info, HMsg msgs[],
     HRegionInfo mostLoadedRegions[])
   throws IOException;
+
+
+  /**
+   * Get the sequence id of the last MemStore entry flushed to an
+   * HFile for a specified region. Used by the region server to speed up
+   * log splitting
+   *
+   * @param regionName
+   * @return The last HLog sequence id flushed from MemStore to HFile for
+   *         the region
+   * @throws IOException
+   */
+  public long getLastFlushedSequenceId(byte[] regionName)
+  throws IOException;
 }
\ No newline at end of file

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Sat Aug 25 06:57:29 2012
@@ -35,9 +35,12 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Random;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -164,6 +167,9 @@ public class HMaster extends HasThread i
   private final AtomicBoolean clusterShutdownRequested =
       new AtomicBoolean(false);
 
+  private final SortedMap<byte[], Long> flushedSequenceIdByRegion =
+      new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+
   private final Configuration conf;
   private final Path rootdir;
   private InfoServer infoServer;
@@ -1269,6 +1275,27 @@ public class HMaster extends HasThread i
       this.serverManager.regionServerReport(serverInfo, msgs, mostLoadedRegions));
   }
 
+  void updateLastFlushedSequenceIds(HServerInfo serverInfo) {
+    SortedMap<byte[], Long> flushedSequenceIds = serverInfo.getFlushedSequenceIdByRegion();
+    for (Entry<byte[], Long> entry : flushedSequenceIds.entrySet()) {
+      Long existingValue = flushedSequenceIdByRegion.get(entry.getKey());
+      if (existingValue != null) {
+        if (entry.getValue() < existingValue) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("RegionServer " + serverInfo +
+                " indicates a last flushed sequence id (" + entry.getValue() +
+                ") that is less than the previous last flushed sequence id (" +
+                existingValue + ") for region " +
+                Bytes.toString(entry.getKey()) + " Ignoring.");
+          }
+          continue; // Don't let smaller sequence ids override greater
+                    // sequence ids.
+        }
+      }
+      flushedSequenceIdByRegion.put(entry.getKey(), entry.getValue());
+    }
+  }
+
   /**
    * Override if you'd add messages to return to regionserver <code>hsi</code>
    * or to send an exception.
@@ -1539,7 +1566,7 @@ public class HMaster extends HasThread i
     HColumnDescriptor descriptor)
   throws IOException {
     alterTable(tableName, null, Arrays.asList(
-          new Pair<byte [], HColumnDescriptor>(columnName, descriptor)), null);
+        new Pair<byte[], HColumnDescriptor>(columnName, descriptor)), null);
   }
 
   @Override
@@ -2214,6 +2241,14 @@ public class HMaster extends HasThread i
     }
   }
 
+  @Override
+  public long getLastFlushedSequenceId(byte[] regionName) throws IOException {
+    if (flushedSequenceIdByRegion.containsKey(regionName)) {
+      return flushedSequenceIdByRegion.get(regionName);
+    }
+    return -1;
+  }
+
   String getZKWrapperName() {
     return getServerName();
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Sat Aug 25 06:57:29 2012
@@ -387,6 +387,7 @@ public class ServerManager {
       // will send us one of these messages after it gets MSG_REGIONSERVER_STOP
       return new HMsg [] {HMsg.REGIONSERVER_STOP};
     }
+    this.master.updateLastFlushedSequenceIds(info);
     return processRegionServerAllsWell(info, mostLoadedRegions, msgs);
   }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Aug 25 06:57:29 2012
@@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.HColumnDe
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
@@ -1436,7 +1437,6 @@ public class HRegion implements HeapSize
       sequenceId = (wal == null)? myseqid :
         wal.startCacheFlush(this.regionInfo.getRegionName());
       completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
-
       for (Store s : stores.values()) {
         storeFlushers.add(s.getStoreFlusher(completeSequenceId));
       }
@@ -1540,6 +1540,13 @@ public class HRegion implements HeapSize
         this.getRegionInfo().isMetaRegion());
     }
 
+    // Update the last flushed sequence id for region
+    if (this.regionServer != null) {
+      this.regionServer.getServerInfo().setFlushedSequenceIdForRegion(
+          getRegionName(),
+          completeSequenceId);
+    }
+
     // C. Finally notify anyone waiting on memstore to clear:
     // e.g. checkResources().
     synchronized (this) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat Aug 25 06:57:29 2012
@@ -59,6 +59,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.cli.CommandLine;
@@ -193,7 +194,7 @@ public class HRegionServer implements HR
   // If false, the file system has become unavailable
   protected volatile boolean fsOk;
 
-  protected HServerInfo serverInfo;
+  protected volatile HServerInfo serverInfo;
   protected final Configuration conf;
 
   private final ServerConnection connection;
@@ -336,6 +337,9 @@ public class HRegionServer implements HR
 
   /** Regionserver launched by the main method. Not used in tests. */
   private static HRegionServer mainRegionServer;
+  /** Keep a reference to the current active master for use by HLogSplitter. */
+  private final AtomicReference<HMasterRegionInterface> masterRef =
+      new AtomicReference<HMasterRegionInterface>();
 
   /**
    * Starts a HRegionServer at the default location
@@ -1466,7 +1470,7 @@ public class HRegionServer implements HR
     // Create the log splitting worker and start it
     this.splitLogWorker = new SplitLogWorker(this.zooKeeperWrapper,
         this.getConfiguration(), this.serverInfo.getServerName(),
-        logCloseThreadPool);
+        logCloseThreadPool, masterRef);
     splitLogWorker.start();
     LOG.info("HRegionServer started at: " +
       this.serverInfo.getServerAddress().toString());
@@ -1651,6 +1655,7 @@ public class HRegionServer implements HR
       }
     }
     this.hbaseMaster = master;
+    masterRef.set(hbaseMaster);
     return true;
   }
 
@@ -2849,10 +2854,12 @@ public class HRegionServer implements HR
    * @return the removed HRegion, or null if the HRegion was not in onlineRegions.
    */
   HRegion removeFromOnlineRegions(HRegionInfo hri) {
+    byte[] regionName = hri.getRegionName();
+    serverInfo.getFlushedSequenceIdByRegion().remove(regionName);
     this.lock.writeLock().lock();
     HRegion toReturn = null;
     try {
-      toReturn = onlineRegions.remove(Bytes.mapKey(hri.getRegionName()));
+      toReturn = onlineRegions.remove(Bytes.mapKey(regionName));
     } finally {
       this.lock.writeLock().unlock();
     }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Sat Aug 25 06:57:29 2012
@@ -27,6 +27,7 @@ import java.io.InterruptedIOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,6 +35,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
+import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
 import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -87,7 +90,6 @@ public class SplitLogWorker implements R
   private boolean workerInGrabTask = false;
   protected ZooKeeperWrapper watcher;
 
-
   public SplitLogWorker(ZooKeeperWrapper watcher, Configuration conf,
       String serverName, TaskExecutor executor) {
     this.watcher = watcher;
@@ -97,7 +99,8 @@ public class SplitLogWorker implements R
   }
 
   public SplitLogWorker(ZooKeeperWrapper watcher, final Configuration conf,
-      final String serverName, final ExecutorService logCloseThreadPool) {
+      final String serverName, final ExecutorService logCloseThreadPool,
+      final AtomicReference<HMasterRegionInterface> masterRef) {
     this(watcher, conf, serverName, new TaskExecutor () {
       @Override
       public Status exec(String filename, CancelableProgressable p) {
@@ -128,7 +131,7 @@ public class SplitLogWorker implements R
           String tmpname =
             ZKSplitLog.getSplitLogDirTmpComponent(serverName, filename);
           if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname,
-              st, fs, conf, p, logCloseThreadPool) == false) {
+              st, fs, conf, p, logCloseThreadPool, masterRef.get()) == false) {
             return Status.PREEMPTED;
           }
         } catch (InterruptedIOException iioe) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Sat Aug 25 06:57:29 2012
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.net.ConnectException;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -47,6 +48,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
+import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
@@ -82,11 +85,12 @@ public class HLogSplitter {
 
   // Thread pool for closing LogWriters in parallel
   protected final ExecutorService logCloseThreadPool;
+  // For checking the latest flushed sequence id
+  protected final HMasterRegionInterface master;
 
   // If an exception is thrown by one of the other threads, it will be
   // stored here.
   protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
-
   // Wait/notify for when data has been produced by the reader thread,
   // consumed by the reader thread, or an exception occurred
   Object dataAvailable = new Object();
@@ -136,13 +140,15 @@ public class HLogSplitter {
   }
 
   public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
-      Path oldLogDir, FileSystem fs, ExecutorService logCloseThreadPool) {
+      Path oldLogDir, FileSystem fs, ExecutorService logCloseThreadPool,
+      HMasterRegionInterface master) {
     this.conf = conf;
     this.rootDir = rootDir;
     this.srcDir = srcDir;
     this.oldLogDir = oldLogDir;
     this.fs = fs;
     this.logCloseThreadPool = logCloseThreadPool;
+    this.master = master;
   }
 
   /**
@@ -166,9 +172,10 @@ public class HLogSplitter {
   static public boolean splitLogFileToTemp(Path rootDir, String tmpname,
       FileStatus logfile, FileSystem fs,
       Configuration conf, CancelableProgressable reporter,
-      ExecutorService logCloseThreadPool) throws IOException {
+      ExecutorService logCloseThreadPool, HMasterRegionInterface master)
+      throws IOException {
     HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */,
-        fs, logCloseThreadPool);
+        fs, logCloseThreadPool, master);
     return s.splitLogFileToTemp(logfile, tmpname, reporter);
   }
 
@@ -217,11 +224,32 @@ public class HLogSplitter {
       status.markComplete("Failed: reporter.progress asked us to terminate");
       return false;
     }
+    Map<byte[], Long> lastFlushedSequenceIds =
+        new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
     int editsCount = 0;
+    int editsSkipped = 0;
     Entry entry;
     try {
       while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
         byte[] region = entry.getKey().getRegionName();
+        Long lastFlushedSequenceId = -1l;
+        if (master != null) {
+          lastFlushedSequenceId = lastFlushedSequenceIds.get(region);
+          if (lastFlushedSequenceId == null) {
+            try {
+              lastFlushedSequenceId = master.getLastFlushedSequenceId(region);
+              lastFlushedSequenceIds.put(region, lastFlushedSequenceId);
+            } catch (ConnectException e) {
+              lastFlushedSequenceId = -1l;
+              LOG.warn("Unable to connect to the master to check " +
+                  "the last flushed sequence id", e);
+            }
+          }
+        }
+        if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
+          editsSkipped++;
+          continue;
+        }
         Object o = logWriters.get(region);
         if (o == BAD_WRITER) {
           continue;
@@ -241,7 +269,8 @@ public class HLogSplitter {
         wap.w.append(entry);
         editsCount++;
         if (editsCount % interval == 0) {
-          status.setStatus("Split " + editsCount + " edits");
+          status.setStatus("Split " + (editsCount - editsSkipped) +
+              " edits, skipped " + editsSkipped + " edits.");
           long t1 = EnvironmentEdgeManager.currentTimeMillis();
           if ((t1 - last_report_at) > period) {
             last_report_at = t;

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestSerialization.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestSerialization.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestSerialization.java Sat Aug 25 06:57:29 2012
@@ -138,10 +138,13 @@ public class TestSerialization extends H
    */
   public void testServerInfo() throws Exception {
     HServerInfo hsi = new HServerInfo(new HServerAddress("0.0.0.0:123"), -1, "default name");
+    byte[] region = Bytes.toBytes("region");
+    hsi.setFlushedSequenceIdForRegion(region, 0xfaceb);
     byte [] b = Writables.getBytes(hsi);
     HServerInfo deserializedHsi =
       (HServerInfo)Writables.getWritable(b, new HServerInfo());
     assertTrue(hsi.equals(deserializedHsi));
+    assertEquals(hsi.getFlushedSequenceIdForRegion(region), 0xfaceb);
   }
 
   public void testPut() throws Exception{

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java?rev=1377225&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java Sat Aug 25 06:57:29 2012
@@ -0,0 +1,147 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+public class TestHLogFiltering {
+
+  private static final Log LOG = LogFactory.getLog(TestHLogFiltering.class);
+
+  private static final int NUM_MASTERS = 1;
+  private static final int NUM_RS = 3;
+
+  private static final byte[] TABLE_NAME = Bytes.toBytes("TestHLogFiltering");
+  private static final byte[] CF1 = Bytes.toBytes("MyCF1");
+  private static final byte[] CF2 = Bytes.toBytes("MyCF2");
+  private static final byte[][] FAMILIES = { CF1, CF2 };
+
+  private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @Before
+  public void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
+    fillTable();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private void fillTable() throws IOException, InterruptedException {
+    HTable table = TEST_UTIL.createTable(TABLE_NAME, FAMILIES, 3,
+        Bytes.toBytes("row0"), Bytes.toBytes("row99"), NUM_RS);
+    Random rand = new Random(19387129L);
+    for (int iStoreFile = 0; iStoreFile < 4; ++iStoreFile) {
+      for (int iRow = 0; iRow < 100; ++iRow) {
+        final byte[] row = Bytes.toBytes("row" + iRow);
+        Put put = new Put(row);
+        Delete del = new Delete(row);
+        for (int iCol = 0; iCol < 10; ++iCol) {
+          final byte[] cf = rand.nextBoolean() ? CF1 : CF2;
+          final long ts = rand.nextInt();
+          final byte[] qual = Bytes.toBytes("col" + iCol);
+          if (rand.nextBoolean()) {
+            final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
+                "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
+                ts + "_random_" + rand.nextLong());
+            put.add(cf, qual, ts, value);
+          } else if (rand.nextDouble() < 0.8) {
+            del.deleteColumn(cf, qual, ts);
+          } else {
+            del.deleteColumns(cf, qual, ts);
+          }
+        }
+        table.put(put);
+        table.delete(del);
+        table.flushCommits();
+      }
+    }
+    TEST_UTIL.waitUntilAllRegionsAssigned(NUM_RS);
+  }
+
+  @Test
+  public void testFlushedSequenceIdsSentToHMaster()
+  throws IOException, InterruptedException {
+    SortedMap<byte[], Long> allFlushedSequenceIds =
+        new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+    for (int i = 0; i < NUM_RS; ++i) {
+      SortedMap<byte[], Long> flushedSequenceIds = flushAllRegions(i);
+      assertTrue(flushedSequenceIds.size() > 0);
+      allFlushedSequenceIds.putAll(flushedSequenceIds);
+    }
+    Thread.sleep(10000);
+    HMasterRegionInterface master = TEST_UTIL.getMiniHBaseCluster().getMaster();
+    for (int i = 0; i < NUM_RS; ++i) {
+      for (byte[] regionName : getRegionsByServer(i)) {
+        if (allFlushedSequenceIds.containsKey(regionName)) {
+          assertEquals((long)allFlushedSequenceIds.get(regionName),
+              master.getLastFlushedSequenceId(regionName));
+        }
+      }
+    }
+  }
+
+  private List<byte[]> getRegionsByServer(int rsId) throws IOException {
+    List<byte[]> regionNames = Lists.newArrayList();
+    HRegionServer hrs = getRegionServer(rsId);
+    for (HRegionInfo hri : hrs.getRegionsAssignment()) {
+      regionNames.add(hri.getRegionName());
+    }
+    return regionNames;
+  }
+
+  private HRegionServer getRegionServer(int rsId) {
+    return TEST_UTIL.getMiniHBaseCluster().getRegionServer(rsId);
+  }
+
+  private SortedMap<byte[], Long> flushAllRegions(int rsId)
+  throws IOException {
+    HRegionServer hrs = getRegionServer(rsId);
+    for (byte[] regionName : getRegionsByServer(rsId)) {
+      hrs.flushRegion(regionName);
+    }
+    return hrs.getServerInfo().getFlushedSequenceIdByRegion();
+  }
+}

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1377225&r1=1377224&r2=1377225&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Sat Aug 25 06:57:29 2012
@@ -725,7 +725,7 @@ public class TestHLogSplit {
     FileStatus logfile = fs.listStatus(hlogDir)[0];
     fs.initialize(fs.getUri(), conf);
     HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
-        reporter, logCloseThreadPool);
+        reporter, logCloseThreadPool, null);
     HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
         logfile.getPath().toString(), conf);
 
@@ -751,7 +751,7 @@ public class TestHLogSplit {
     fs.delete(regiondir, true);
 
     HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
-        reporter, logCloseThreadPool);
+        reporter, logCloseThreadPool, null);
     HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
         logfile.getPath().toString(), conf);
     // This test passes if there are no exceptions when
@@ -769,7 +769,7 @@ public class TestHLogSplit {
     fs.initialize(fs.getUri(), conf);
 
     HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
-        reporter, logCloseThreadPool);
+        reporter, logCloseThreadPool, null);
     HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
         logfile.getPath().toString(), conf);
     Path tdir = HTableDescriptor.getTableDir(hbaseDir, TABLE_NAME);
@@ -787,7 +787,7 @@ public class TestHLogSplit {
     fs.initialize(fs.getUri(), conf);
 
     HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
-        reporter, logCloseThreadPool);
+        reporter, logCloseThreadPool, null);
     HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
         logfile.getPath().toString(), conf);
     for (String region : regions) {
@@ -807,7 +807,7 @@ public class TestHLogSplit {
 
     fs.initialize(fs.getUri(), conf);
     HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
-        reporter, logCloseThreadPool);
+        reporter, logCloseThreadPool, null);
     HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
         logfile.getPath().toString(), conf);