You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/03/07 10:14:44 UTC

[08/33] hbase git commit: HBASE-20115 Reimplement serial replication based on the new replication storage layer

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b86839/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
new file mode 100644
index 0000000..31c3ac7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Holds a batch of WAL entries to replicate, along with some statistics
+ */
+@InterfaceAudience.Private
+class WALEntryBatch {
+  private List<Entry> walEntries;
+  // last WAL that was read
+  private Path lastWalPath;
+  // position in WAL of last entry in this batch
+  private long lastWalPosition = 0;
+  // number of distinct row keys in this batch
+  private int nbRowKeys = 0;
+  // number of HFiles
+  private int nbHFiles = 0;
+  // heap size of data we need to replicate
+  private long heapSize = 0;
+  // save the last sequenceid for each region if the table has serial-replication scope
+  private Map<String, Long> lastSeqIds = new HashMap<>();
+
+  /**
+   * @param lastWalPath Path of the WAL the last entry in this batch was read from
+   */
+  WALEntryBatch(int maxNbEntries, Path lastWalPath) {
+    this.walEntries = new ArrayList<>(maxNbEntries);
+    this.lastWalPath = lastWalPath;
+  }
+
+  public void addEntry(Entry entry) {
+    walEntries.add(entry);
+  }
+
+  /**
+   * @return the WAL Entries.
+   */
+  public List<Entry> getWalEntries() {
+    return walEntries;
+  }
+
+  /**
+   * @return the path of the last WAL that was read.
+   */
+  public Path getLastWalPath() {
+    return lastWalPath;
+  }
+
+  /**
+   * @return the position in the last WAL that was read.
+   */
+  public long getLastWalPosition() {
+    return lastWalPosition;
+  }
+
+  public void setLastWalPosition(long lastWalPosition) {
+    this.lastWalPosition = lastWalPosition;
+  }
+
+  public int getNbEntries() {
+    return walEntries.size();
+  }
+
+  /**
+   * @return the number of distinct row keys in this batch
+   */
+  public int getNbRowKeys() {
+    return nbRowKeys;
+  }
+
+  /**
+   * @return the number of HFiles in this batch
+   */
+  public int getNbHFiles() {
+    return nbHFiles;
+  }
+
+  /**
+   * @return total number of operations in this batch
+   */
+  public int getNbOperations() {
+    return getNbRowKeys() + getNbHFiles();
+  }
+
+  /**
+   * @return the heap size of this batch
+   */
+  public long getHeapSize() {
+    return heapSize;
+  }
+
+  /**
+   * @return the last sequenceid for each region if the table has serial-replication scope
+   */
+  public Map<String, Long> getLastSeqIds() {
+    return lastSeqIds;
+  }
+
+  public void incrementNbRowKeys(int increment) {
+    nbRowKeys += increment;
+  }
+
+  public void incrementNbHFiles(int increment) {
+    nbHFiles += increment;
+  }
+
+  public void incrementHeapSize(long increment) {
+    heapSize += increment;
+  }
+
+  public void setLastSeqId(String region, long sequenceId) {
+    lastSeqIds.put(region, sequenceId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b86839/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 7c83c0c..bcab9b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -21,29 +21,26 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.NoSuchElementException;
 import java.util.OptionalLong;
 import java.util.concurrent.PriorityBlockingQueue;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
@@ -102,16 +99,18 @@ class WALEntryStream implements Closeable {
   }
 
   /**
-   * @return the next WAL entry in this stream
-   * @throws IOException
-   * @throws NoSuchElementException if no more entries in the stream.
+   * Returns the next WAL entry in this stream but does not advance.
+   */
+  public Entry peek() throws IOException {
+    return hasNext() ? currentEntry: null;
+  }
+
+  /**
+   * Returns the next WAL entry in this stream and advance the stream.
    */
   public Entry next() throws IOException {
-    if (!hasNext()) {
-      throw new NoSuchElementException();
-    }
-    Entry save = currentEntry;
-    currentEntry = null; // gets reloaded by hasNext()
+    Entry save = peek();
+    currentEntry = null;
     return save;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b86839/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
index ca64172..ab48313 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
@@ -168,6 +168,14 @@ public class FSTableDescriptors implements TableDescriptors {
                     // Disable blooms for meta.  Needs work.  Seems to mess w/ getClosestOrBefore.
                     .setBloomFilterType(BloomType.NONE)
                     .build())
+            .addColumnFamily(ColumnFamilyDescriptorBuilder
+                    .newBuilder(HConstants.REPLICATION_BARRIER_FAMILY)
+                    .setMaxVersions(HConstants.ALL_VERSIONS)
+                    .setInMemory(true)
+                    .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
+                    // Disable blooms for meta.  Needs work.  Seems to mess w/ getClosestOrBefore.
+                    .setBloomFilterType(BloomType.NONE)
+                    .build())
             .addCoprocessor("org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint",
                     null, Coprocessor.PRIORITY_SYSTEM, null);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b86839/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index c0b72aa..b106a31 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -18,13 +18,7 @@
  */
 package org.apache.hadoop.hbase.util;
 
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
-import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
-import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
-
 import edu.umd.cs.findbugs.annotations.CheckForNull;
-
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.EOFException;
@@ -54,7 +48,6 @@ import java.util.concurrent.FutureTask;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -71,9 +64,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.HFileLink;
@@ -81,8 +72,6 @@ import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
@@ -94,6 +83,17 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
+import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
 
 /**
  * Utility methods for interacting with the underlying file system.
@@ -1028,6 +1028,10 @@ public abstract class FSUtils extends CommonFSUtils {
     return regionDirs;
   }
 
+  public static Path getRegionDir(Path tableDir, RegionInfo region) {
+    return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
+  }
+
   /**
    * Filter for all dirs that are legal column family names.  This is generally used for colfam
    * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b86839/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
index c1a77ee..ac23d1d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
@@ -415,10 +415,16 @@ public class WALKeyImpl implements WALKey {
     this.replicationScope = replicationScope;
   }
 
-  public void serializeReplicationScope(boolean serialize) {
-    if (!serialize) {
-      setReplicationScope(null);
+  public void clearReplicationScope() {
+    setReplicationScope(null);
+  }
+
+  public boolean hasSerialReplicationScope() {
+    if (replicationScope == null || replicationScope.isEmpty()) {
+      return false;
     }
+    return replicationScope.values().stream()
+      .anyMatch(scope -> scope.intValue() == HConstants.REPLICATION_SCOPE_SERIAL);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b86839/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
index ec93207..9161e25 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
@@ -494,7 +494,7 @@ public class TestMetaTableAccessor {
       List<RegionInfo> regionInfos = Lists.newArrayList(parent);
       MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
 
-      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
+      MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, serverName0, 3);
 
       assertEmptyMetaLocation(meta, splitA.getRegionName(), 1);
       assertEmptyMetaLocation(meta, splitA.getRegionName(), 2);
@@ -535,7 +535,8 @@ public class TestMetaTableAccessor {
       List<RegionInfo> regionInfos = Lists.newArrayList(parentA, parentB);
       MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
 
-      MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3);
+      MetaTableAccessor.mergeRegions(connection, merged, parentA, -1L, parentB, -1L, serverName0,
+        3);
 
       assertEmptyMetaLocation(meta, merged.getRegionName(), 1);
       assertEmptyMetaLocation(meta, merged.getRegionName(), 2);
@@ -682,8 +683,8 @@ public class TestMetaTableAccessor {
       EnvironmentEdgeManager.injectEdge(edge);
       try {
         // now merge the regions, effectively deleting the rows for region a and b.
-        MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, regionInfoA, regionInfoB, sn,
-          1);
+        MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, regionInfoA, -1L, regionInfoB,
+          -1L, sn, 1);
       } finally {
         EnvironmentEdgeManager.reset();
       }
@@ -776,7 +777,8 @@ public class TestMetaTableAccessor {
       }
       SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler();
       long prevCalls = scheduler.numPriorityCalls;
-      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1);
+      MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, loc.getServerName(),
+        1);
 
       assertTrue(prevCalls < scheduler.numPriorityCalls);
     }
@@ -813,7 +815,7 @@ public class TestMetaTableAccessor {
       List<RegionInfo> regionInfos = Lists.newArrayList(parent);
       MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
 
-      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
+      MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, serverName0, 3);
       Get get1 = new Get(splitA.getRegionName());
       Result resultA = meta.get(get1);
       Cell serverCellA = resultA.getColumnLatestCell(HConstants.CATALOG_FAMILY,

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b86839/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
index 0108bb5..be29f1a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
@@ -194,7 +194,7 @@ public class TestHRegionFileSystem {
 
   @Test
   public void testOnDiskRegionCreation() throws IOException {
-    Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testOnDiskRegionCreation");
+    Path rootDir = TEST_UTIL.getDataTestDirOnTestFS(name.getMethodName());
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     Configuration conf = TEST_UTIL.getConfiguration();
 
@@ -226,7 +226,7 @@ public class TestHRegionFileSystem {
 
   @Test
   public void testNonIdempotentOpsWithRetries() throws IOException {
-    Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testOnDiskRegionCreation");
+    Path rootDir = TEST_UTIL.getDataTestDirOnTestFS(name.getMethodName());
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     Configuration conf = TEST_UTIL.getConfiguration();
 
@@ -235,19 +235,15 @@ public class TestHRegionFileSystem {
     HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, rootDir, hri);
     assertTrue(fs.exists(regionFs.getRegionDir()));
 
-    regionFs = new HRegionFileSystem(conf, new MockFileSystemForCreate(),
-        null, null);
-    // HRegionFileSystem.createRegionOnFileSystem(conf, new MockFileSystemForCreate(), rootDir,
-    // hri);
+    regionFs = new HRegionFileSystem(conf, new MockFileSystemForCreate(), rootDir, hri);
     boolean result = regionFs.createDir(new Path("/foo/bar"));
     assertTrue("Couldn't create the directory", result);
 
-
-    regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null);
+    regionFs = new HRegionFileSystem(conf, new MockFileSystem(), rootDir, hri);
     result = regionFs.rename(new Path("/foo/bar"), new Path("/foo/bar2"));
     assertTrue("Couldn't rename the directory", result);
 
-    regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null);
+    regionFs = new HRegionFileSystem(conf, new MockFileSystem(), rootDir, hri);
     result = regionFs.deleteDir(new Path("/foo/bar"));
     assertTrue("Couldn't delete the directory", result);
     fs.delete(rootDir, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b86839/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
index 6af72ca..e2d9159 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
@@ -343,12 +343,12 @@ public class TestRegionServerMetrics {
 
   @Test
   public void testStoreCount() throws Exception {
-    //Force a hfile.
+    // Force a hfile.
     doNPuts(1, false);
     TEST_UTIL.getAdmin().flush(tableName);
 
     metricsRegionServer.getRegionServerWrapper().forceRecompute();
-    assertGauge("storeCount", TABLES_ON_MASTER? 1: 4);
+    assertGauge("storeCount", TABLES_ON_MASTER ? 1 : 5);
     assertGauge("storeFileCount", 1);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b86839/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
index ffa03a2..e9e92b8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.replication;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.fail;
 
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -38,6 +36,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.junit.Before;
@@ -47,7 +46,7 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Category(LargeTests.class)
+@Category({ ReplicationTests.class, LargeTests.class })
 public class TestReplicationDroppedTables extends TestReplicationBase {
 
   @ClassRule
@@ -56,9 +55,6 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class);
 
-  /**
-   * @throws java.lang.Exception
-   */
   @Before
   public void setUp() throws Exception {
     // Starting and stopping replication can make us miss new logs,

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b86839/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
new file mode 100644
index 0000000..dfa78e7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.UUID;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSerialReplication {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestSerialReplication.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static String PEER_ID = "1";
+
+  private static byte[] CF = Bytes.toBytes("CF");
+
+  private static byte[] CQ = Bytes.toBytes("CQ");
+
+  private static FileSystem FS;
+
+  private static Path LOG_DIR;
+
+  private static WALProvider.Writer WRITER;
+
+  public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint {
+
+    private static final UUID PEER_UUID = UUID.randomUUID();
+
+    @Override
+    public UUID getPeerUUID() {
+      return PEER_UUID;
+    }
+
+    @Override
+    public boolean replicate(ReplicateContext replicateContext) {
+      synchronized (WRITER) {
+        try {
+          for (Entry entry : replicateContext.getEntries()) {
+            WRITER.append(entry);
+          }
+          WRITER.sync();
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public void start() {
+      startAsync();
+    }
+
+    @Override
+    public void stop() {
+      stopAsync();
+    }
+
+    @Override
+    protected void doStart() {
+      notifyStarted();
+    }
+
+    @Override
+    protected void doStop() {
+      notifyStopped();
+    }
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10);
+    UTIL.startMiniCluster(3);
+    LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated");
+    FS = UTIL.getTestFileSystem();
+    FS.mkdirs(LOG_DIR);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Rule
+  public final TestName name = new TestName();
+
+  private Path logPath;
+
+  @Before
+  public void setUp() throws IOException, StreamLacksCapabilityException {
+    UTIL.ensureSomeRegionServersAvailable(3);
+    logPath = new Path(LOG_DIR, name.getMethodName());
+    WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration());
+    // add in disable state, so later when enabling it all sources will start push together.
+    UTIL.getAdmin().addReplicationPeer(PEER_ID,
+      ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
+        .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(),
+      false);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    UTIL.getAdmin().removeReplicationPeer(PEER_ID);
+    if (WRITER != null) {
+      WRITER.close();
+      WRITER = null;
+    }
+  }
+
+  @Test
+  public void testRegionMove() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    UTIL.getAdmin().createTable(
+      TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
+        .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build());
+    UTIL.waitTableAvailable(tableName);
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+      }
+    }
+    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
+    HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
+    UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
+      Bytes.toBytes(rs.getServerName().getServerName()));
+    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return !rs.getRegions(tableName).isEmpty();
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return region + " is still not on " + rs;
+      }
+    });
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 100; i < 200; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+      }
+    }
+    UTIL.getAdmin().enableReplicationPeer(PEER_ID);
+    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) {
+          int count = 0;
+          while (reader.next() != null) {
+            count++;
+          }
+          return count >= 200;
+        } catch (IOException e) {
+          return false;
+        }
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "Not enough entries replicated";
+      }
+    });
+    try (WAL.Reader reader =
+      WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
+      long seqId = -1L;
+      int count = 0;
+      for (Entry entry;;) {
+        entry = reader.next();
+        if (entry == null) {
+          break;
+        }
+        assertTrue(
+          "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(),
+          entry.getKey().getSequenceId() >= seqId);
+        count++;
+      }
+      assertEquals(200, count);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b86839/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index a53cba3..6d75fec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -321,7 +321,7 @@ public abstract class TestReplicationSourceManager {
     wal.rollWriter();
 
     manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
-        "1", 0, false);
+        "1", 0, null, false);
 
     wal.append(hri,
         new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b86839/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
new file mode 100644
index 0000000..c8387c5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Cell.Type;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestSerialReplicationChecker {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestSerialReplicationChecker.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static String PEER_ID = "1";
+
+  private static ReplicationQueueStorage QUEUE_STORAGE;
+
+  private static String WAL_FILE_NAME = "test.wal";
+
+  private SerialReplicationChecker checker;
+
+  @Rule
+  public final TestName name = new TestName();
+
+  private TableName tableName;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    UTIL.startMiniCluster(1);
+    QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(),
+      UTIL.getConfiguration());
+    QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_ID,
+      WAL_FILE_NAME);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    ReplicationSource source = mock(ReplicationSource.class);
+    when(source.getPeerId()).thenReturn(PEER_ID);
+    when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE);
+    Server server = mock(Server.class);
+    when(server.getConnection()).thenReturn(UTIL.getConnection());
+    when(source.getServer()).thenReturn(server);
+    checker = new SerialReplicationChecker(UTIL.getConfiguration(), source);
+    tableName = TableName.valueOf(name.getMethodName());
+  }
+
+  private Entry createEntry(RegionInfo region, long seqId) {
+    WALKeyImpl key = mock(WALKeyImpl.class);
+    when(key.getTableName()).thenReturn(tableName);
+    when(key.getEncodedRegionName()).thenReturn(region.getEncodedNameAsBytes());
+    when(key.getSequenceId()).thenReturn(seqId);
+    Entry entry = mock(Entry.class);
+    when(entry.getKey()).thenReturn(key);
+    return entry;
+  }
+
+  private Cell createCell(RegionInfo region) {
+    return CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(region.getStartKey())
+      .setType(Type.Put).build();
+  }
+
+  @Test
+  public void testNoBarrierCanPush() throws IOException {
+    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+    assertTrue(checker.canPush(createEntry(region, 100), createCell(region)));
+  }
+
+  private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers)
+      throws IOException {
+    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
+    put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
+      Bytes.toBytes(state.name()));
+    for (int i = 0; i < barriers.length; i++) {
+      put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
+        put.getTimeStamp() - i, Bytes.toBytes(barriers[i]));
+    }
+    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+      table.put(put);
+    }
+  }
+
+  private void setState(RegionInfo region, RegionState.State state) throws IOException {
+    Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
+    put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
+      Bytes.toBytes(state.name()));
+    try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+      table.put(put);
+    }
+  }
+
+  private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException {
+    QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(),
+      PEER_ID, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId));
+  }
+
+  @Test
+  public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException {
+    RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+    addStateAndBarrier(region, RegionState.State.OPEN, 10);
+    Cell cell = createCell(region);
+    // can push since we are in the first range
+    assertTrue(checker.canPush(createEntry(region, 100), cell));
+    setState(region, RegionState.State.OPENING);
+    // can not push since we are in the last range and the state is OPENING
+    assertFalse(checker.canPush(createEntry(region, 102), cell));
+    addStateAndBarrier(region, RegionState.State.OPEN, 50);
+    // can not push since the previous range has not been finished yet
+    assertFalse(checker.canPush(createEntry(region, 102), cell));
+    updatePushedSeqId(region, 49);
+    // can push since the previous range has been finished
+    assertTrue(checker.canPush(createEntry(region, 102), cell));
+    setState(region, RegionState.State.OPENING);
+    assertFalse(checker.canPush(createEntry(region, 104), cell));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b86839/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 2146e47..eb7d5a0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -21,13 +21,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.NavigableMap;
-import java.util.NoSuchElementException;
 import java.util.OptionalLong;
 import java.util.TreeMap;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -40,13 +40,13 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -180,15 +180,12 @@ public class TestWALEntryStream {
         new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
       // There's one edit in the log, read it. Reading past it needs to throw exception
       assertTrue(entryStream.hasNext());
-      WAL.Entry entry = entryStream.next();
+      WAL.Entry entry = entryStream.peek();
+      assertSame(entry, entryStream.next());
       assertNotNull(entry);
       assertFalse(entryStream.hasNext());
-      try {
-        entry = entryStream.next();
-        fail();
-      } catch (NoSuchElementException e) {
-        // expected
-      }
+      assertNull(entryStream.peek());
+      assertNull(entryStream.next());
       oldPos = entryStream.getPosition();
     }
 
@@ -346,10 +343,12 @@ public class TestWALEntryStream {
     // start up a batcher
     ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    Server mockServer= Mockito.mock(Server.class);
     ReplicationSource source = Mockito.mock(ReplicationSource.class);
     when(source.getSourceManager()).thenReturn(mockSourceManager);
     when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
     when(source.getWALFileLengthProvider()).thenReturn(log);
+    when(source.getServer()).thenReturn(mockServer);
     ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf,
         walQueue, 0, getDummyFilter(), source);
     Path walPath = walQueue.peek();