You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/08/09 07:32:47 UTC

[1/2] hbase git commit: HBASE-9465 Push entries to peer clusters serially

Repository: hbase
Updated Branches:
  refs/heads/master 1ecb0fce3 -> 5cadcd59a


http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
new file mode 100644
index 0000000..d4af26d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
@@ -0,0 +1,399 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HTestConst;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSerialReplication {
+  private static final Log LOG = LogFactory.getLog(TestSerialReplication.class);
+
+  private static Configuration conf1;
+  private static Configuration conf2;
+
+  private static HBaseTestingUtility utility1;
+  private static HBaseTestingUtility utility2;
+
+  private static final byte[] famName = Bytes.toBytes("f");
+  private static final byte[] VALUE = Bytes.toBytes("v");
+  private static final byte[] ROW = Bytes.toBytes("r");
+  private static final byte[][] ROWS = HTestConst.makeNAscii(ROW, 100);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf1 = HBaseConfiguration.create();
+    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+    // smaller block size and capacity to trigger more operations
+    // and test them
+    conf1.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
+    conf1.setInt("replication.source.size.capacity", 1024);
+    conf1.setLong("replication.source.sleepforretries", 100);
+    conf1.setInt("hbase.regionserver.maxlogs", 10);
+    conf1.setLong("hbase.master.logcleaner.ttl", 10);
+    conf1.setBoolean("dfs.support.append", true);
+    conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+    conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+        "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
+    conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);// Each WAL is 120 bytes
+    conf1.setLong("replication.source.size.capacity", 1L);
+    conf1.setLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY, 1000L);
+
+    utility1 = new HBaseTestingUtility(conf1);
+    utility1.startMiniZKCluster();
+    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
+    new ZooKeeperWatcher(conf1, "cluster1", null, true);
+
+    conf2 = new Configuration(conf1);
+    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+
+    utility2 = new HBaseTestingUtility(conf2);
+    utility2.setZkCluster(miniZK);
+    new ZooKeeperWatcher(conf2, "cluster2", null, true);
+
+    ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(utility2.getClusterKey());
+    admin1.addPeer("1", rpc, null);
+
+    utility1.startMiniCluster(1, 3);
+    utility2.startMiniCluster(1, 1);
+
+    utility1.getHBaseAdmin().setBalancerRunning(false, true);
+  }
+
+  @Test
+  public void testRegionMoveAndFailover() throws Exception {
+    TableName tableName = TableName.valueOf("testRSFailover");
+    HTableDescriptor table = new HTableDescriptor(tableName);
+    HColumnDescriptor fam = new HColumnDescriptor(famName);
+    fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
+    table.addFamily(fam);
+    utility1.getHBaseAdmin().createTable(table);
+    utility2.getHBaseAdmin().createTable(table);
+    try(Table t1 = utility1.getConnection().getTable(tableName);
+        Table t2 = utility2.getConnection().getTable(tableName)) {
+      LOG.info("move to 1");
+      moveRegion(t1, 1);
+      LOG.info("move to 0");
+      moveRegion(t1, 0);
+      for (int i = 10; i < 20; i++) {
+        Put put = new Put(ROWS[i]);
+        put.addColumn(famName, VALUE, VALUE);
+        t1.put(put);
+      }
+      LOG.info("move to 2");
+      moveRegion(t1, 2);
+      for (int i = 20; i < 30; i++) {
+        Put put = new Put(ROWS[i]);
+        put.addColumn(famName, VALUE, VALUE);
+        t1.put(put);
+      }
+      utility1.getHBaseCluster().abortRegionServer(2);
+      for (int i = 30; i < 40; i++) {
+        Put put = new Put(ROWS[i]);
+        put.addColumn(famName, VALUE, VALUE);
+        t1.put(put);
+      }
+
+      long start = EnvironmentEdgeManager.currentTime();
+      while (EnvironmentEdgeManager.currentTime() - start < 180000) {
+        Scan scan = new Scan();
+        scan.setCaching(100);
+        List<Cell> list = new ArrayList<>();
+        try (ResultScanner results = t2.getScanner(scan)) {
+          for (Result result : results) {
+            assertEquals(1, result.rawCells().length);
+            list.add(result.rawCells()[0]);
+          }
+        }
+        List<Integer> listOfNumbers = getRowNumbers(list);
+        LOG.info(Arrays.toString(listOfNumbers.toArray()));
+        assertIntegerList(listOfNumbers, 10, 1);
+        if (listOfNumbers.size() != 30) {
+          LOG.info("Waiting all logs pushed to slave. Expected 30 , actual " + list.size());
+          Thread.sleep(200);
+          continue;
+        }
+        return;
+      }
+      throw new Exception("Not all logs have been pushed");
+    } finally {
+      utility1.getHBaseAdmin().disableTable(tableName);
+      utility2.getHBaseAdmin().disableTable(tableName);
+      utility1.getHBaseAdmin().deleteTable(tableName);
+      utility2.getHBaseAdmin().deleteTable(tableName);
+    }
+  }
+
+  @Test
+  public void testRegionSplit() throws Exception {
+    TableName tableName = TableName.valueOf("testRegionSplit");
+    HTableDescriptor table = new HTableDescriptor(tableName);
+    HColumnDescriptor fam = new HColumnDescriptor(famName);
+    fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
+    table.addFamily(fam);
+    utility1.getHBaseAdmin().createTable(table);
+    utility2.getHBaseAdmin().createTable(table);
+    try(Table t1 = utility1.getConnection().getTable(tableName);
+        Table t2 = utility2.getConnection().getTable(tableName)) {
+
+      for (int i = 10; i < 100; i += 10) {
+        Put put = new Put(ROWS[i]);
+        put.addColumn(famName, VALUE, VALUE);
+        t1.put(put);
+      }
+      utility1.getHBaseAdmin().split(tableName, ROWS[50]);
+      Thread.sleep(5000L);
+      for (int i = 11; i < 100; i += 10) {
+        Put put = new Put(ROWS[i]);
+        put.addColumn(famName, VALUE, VALUE);
+        t1.put(put);
+      }
+      balanceTwoRegions(t1);
+      for (int i = 12; i < 100; i += 10) {
+        Put put = new Put(ROWS[i]);
+        put.addColumn(famName, VALUE, VALUE);
+        t1.put(put);
+      }
+
+      long start = EnvironmentEdgeManager.currentTime();
+      while (EnvironmentEdgeManager.currentTime() - start < 180000) {
+        Scan scan = new Scan();
+        scan.setCaching(100);
+        List<Cell> list = new ArrayList<>();
+        try (ResultScanner results = t2.getScanner(scan)) {
+          for (Result result : results) {
+            assertEquals(1, result.rawCells().length);
+            list.add(result.rawCells()[0]);
+          }
+        }
+        List<Integer> listOfNumbers = getRowNumbers(list);
+        List<Integer> list0 = new ArrayList<>();
+        List<Integer> list1 = new ArrayList<>();
+        List<Integer> list21 = new ArrayList<>();
+        List<Integer> list22 = new ArrayList<>();
+        for (int num : listOfNumbers) {
+          if (num % 10 == 0) {
+            list0.add(num);
+          } else if (num % 10 == 1) {
+            list1.add(num);
+          } else if (num < 50) { //num%10==2
+            list21.add(num);
+          } else { // num%10==1&&num>50
+            list22.add(num);
+          }
+        }
+
+        LOG.info(Arrays.toString(list0.toArray()));
+        LOG.info(Arrays.toString(list1.toArray()));
+        LOG.info(Arrays.toString(list21.toArray()));
+        LOG.info(Arrays.toString(list22.toArray()));
+        assertIntegerList(list0, 10, 10);
+        assertIntegerList(list1, 11, 10);
+        assertIntegerList(list21, 12, 10);
+        assertIntegerList(list22, 52, 10);
+        if (!list1.isEmpty()) {
+          assertEquals(9, list0.size());
+        }
+        if (!list21.isEmpty() || !list22.isEmpty()) {
+          assertEquals(9, list1.size());
+        }
+
+        if (list.size() == 27) {
+          return;
+        }
+        LOG.info("Waiting all logs pushed to slave. Expected 27 , actual " + list.size());
+        Thread.sleep(200);
+      }
+      throw new Exception("Not all logs have been pushed");
+    } finally {
+      utility1.getHBaseAdmin().disableTable(tableName);
+      utility2.getHBaseAdmin().disableTable(tableName);
+      utility1.getHBaseAdmin().deleteTable(tableName);
+      utility2.getHBaseAdmin().deleteTable(tableName);
+    }
+  }
+
+  @Test
+  public void testRegionMerge() throws Exception {
+    TableName tableName = TableName.valueOf("testRegionMerge");
+    HTableDescriptor table = new HTableDescriptor(tableName);
+    HColumnDescriptor fam = new HColumnDescriptor(famName);
+    fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
+    table.addFamily(fam);
+    utility1.getHBaseAdmin().createTable(table);
+    utility2.getHBaseAdmin().createTable(table);
+    utility1.getHBaseAdmin().split(tableName, ROWS[50]);
+    Thread.sleep(5000L);
+
+    try(Table t1 = utility1.getConnection().getTable(tableName);
+        Table t2 = utility2.getConnection().getTable(tableName)) {
+      for (int i = 10; i < 100; i += 10) {
+        Put put = new Put(ROWS[i]);
+        put.addColumn(famName, VALUE, VALUE);
+        t1.put(put);
+      }
+      List<Pair<HRegionInfo, ServerName>> regions =
+          MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), tableName);
+      assertEquals(2, regions.size());
+      utility1.getHBaseAdmin().mergeRegions(regions.get(0).getFirst().getRegionName(),
+          regions.get(1).getFirst().getRegionName(), true);
+      for (int i = 11; i < 100; i += 10) {
+        Put put = new Put(ROWS[i]);
+        put.addColumn(famName, VALUE, VALUE);
+        t1.put(put);
+      }
+
+      long start = EnvironmentEdgeManager.currentTime();
+      while (EnvironmentEdgeManager.currentTime() - start < 180000) {
+        Scan scan = new Scan();
+        scan.setCaching(100);
+        List<Cell> list = new ArrayList<>();
+        try (ResultScanner results = t2.getScanner(scan)) {
+          for (Result result : results) {
+            assertEquals(1, result.rawCells().length);
+            list.add(result.rawCells()[0]);
+          }
+        }
+        List<Integer> listOfNumbers = getRowNumbers(list);
+        List<Integer> list0 = new ArrayList<>();
+        List<Integer> list1 = new ArrayList<>();
+        for (int num : listOfNumbers) {
+          if (num % 10 == 0) {
+            list0.add(num);
+          } else {
+            list1.add(num);
+          }
+        }
+        LOG.info(Arrays.toString(list0.toArray()));
+        LOG.info(Arrays.toString(list1.toArray()));
+        assertIntegerList(list0, 10, 10);
+        assertIntegerList(list1, 11, 10);
+        if (!list1.isEmpty()) {
+          assertEquals(9, list0.size());
+        }
+        if (list.size() == 18) {
+          return;
+        }
+        LOG.info("Waiting all logs pushed to slave. Expected 18 , actual " + list.size());
+        Thread.sleep(200);
+      }
+
+    } finally {
+      utility1.getHBaseAdmin().disableTable(tableName);
+      utility2.getHBaseAdmin().disableTable(tableName);
+      utility1.getHBaseAdmin().deleteTable(tableName);
+      utility2.getHBaseAdmin().deleteTable(tableName);
+    }
+  }
+
+  private List<Integer> getRowNumbers(List<Cell> cells) {
+    List<Integer> listOfRowNumbers = new ArrayList<>();
+    for (Cell c : cells) {
+      listOfRowNumbers.add(Integer.parseInt(Bytes
+          .toString(c.getRowArray(), c.getRowOffset() + ROW.length,
+              c.getRowLength() - ROW.length)));
+    }
+    return listOfRowNumbers;
+  }
+
+  @AfterClass
+  public static void setUpAfterClass() throws Exception {
+    utility2.shutdownMiniCluster();
+    utility1.shutdownMiniCluster();
+  }
+
+  private void moveRegion(Table table, int index) throws IOException {
+    List<Pair<HRegionInfo, ServerName>> regions =
+        MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), table.getName());
+    assertEquals(1, regions.size());
+    HRegionInfo regionInfo = regions.get(0).getFirst();
+    ServerName name = utility1.getHBaseCluster().getRegionServer(index).getServerName();
+    utility1.getAdmin()
+        .move(regionInfo.getEncodedNameAsBytes(), Bytes.toBytes(name.getServerName()));
+    try {
+      Thread.sleep(5000L); // wait to complete
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void balanceTwoRegions(Table table) throws Exception {
+    List<Pair<HRegionInfo, ServerName>> regions =
+        MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), table.getName());
+    assertEquals(2, regions.size());
+    HRegionInfo regionInfo1 = regions.get(0).getFirst();
+    ServerName name1 = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+    HRegionInfo regionInfo2 = regions.get(1).getFirst();
+    ServerName name2 = utility1.getHBaseCluster().getRegionServer(1).getServerName();
+    utility1.getAdmin()
+        .move(regionInfo1.getEncodedNameAsBytes(), Bytes.toBytes(name1.getServerName()));
+    Thread.sleep(5000L);
+    utility1.getAdmin()
+        .move(regionInfo2.getEncodedNameAsBytes(), Bytes.toBytes(name2.getServerName()));
+    Thread.sleep(5000L);
+  }
+
+  private void assertIntegerList(List<Integer> list, int start, int step) {
+    int size = list.size();
+    for (int i = 0; i < size; i++) {
+      assertTrue(list.get(i) == start + step * i);
+    }
+  }
+}


[2/2] hbase git commit: HBASE-9465 Push entries to peer clusters serially

Posted by zh...@apache.org.
HBASE-9465 Push entries to peer clusters serially

Signed-off-by: zhangduo <zh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5cadcd59
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5cadcd59
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5cadcd59

Branch: refs/heads/master
Commit: 5cadcd59aa57c9566349dc8551c958dc974e774e
Parents: 1ecb0fc
Author: Phil Yang <ud...@gmail.com>
Authored: Thu Aug 4 10:11:56 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Aug 9 15:25:50 2016 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/HTableDescriptor.java   |  12 +
 .../apache/hadoop/hbase/MetaTableAccessor.java  | 234 ++++++++++-
 .../hbase/client/ConnectionImplementation.java  |   1 +
 .../client/replication/ReplicationAdmin.java    |  14 +-
 .../org/apache/hadoop/hbase/HConstants.java     |  26 ++
 .../src/main/resources/hbase-default.xml        |  14 +
 .../hbase/protobuf/generated/WALProtos.java     |  18 +-
 hbase-protocol/src/main/protobuf/WAL.proto      |   1 +
 .../org/apache/hadoop/hbase/master/HMaster.java |   5 +
 .../hadoop/hbase/master/RegionStateStore.java   |  47 ++-
 .../master/cleaner/ReplicationMetaCleaner.java  | 186 +++++++++
 .../hbase/regionserver/wal/FSWALEntry.java      |   1 -
 .../replication/regionserver/Replication.java   |  12 +
 .../regionserver/ReplicationSource.java         | 127 +++++-
 .../regionserver/ReplicationSourceManager.java  |  87 +++-
 .../hadoop/hbase/util/FSTableDescriptors.java   |  24 ++
 .../java/org/apache/hadoop/hbase/wal/WAL.java   |  16 +
 .../hadoop/hbase/TestMetaTableAccessor.java     |   8 +-
 .../master/TestAssignmentManagerOnCluster.java  |   2 +-
 .../replication/TestSerialReplication.java      | 399 +++++++++++++++++++
 20 files changed, 1176 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index ccad414..9abdf42 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -1115,6 +1115,18 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
   }
 
   /**
+   * Return true if there are at least one cf whose replication scope is serial.
+   */
+  public boolean hasSerialReplicationScope() {
+    for (HColumnDescriptor column: getFamilies()){
+      if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL){
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * Returns the configured replicas per region
    */
   public int getRegionReplication() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index a5dbc94..1eaa753 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -17,11 +17,15 @@
  */
 package org.apache.hadoop.hbase;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -34,8 +38,6 @@ import java.util.regex.Pattern;
 
 import edu.umd.cs.findbugs.annotations.NonNull;
 import edu.umd.cs.findbugs.annotations.Nullable;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -113,14 +115,31 @@ public class MetaTableAccessor {
    *                             region is the result of a merge
    * info:mergeB              => contains a serialized HRI for the second parent region if the
    *                             region is the result of a merge
-   *
    * The actual layout of meta should be encapsulated inside MetaTableAccessor methods,
    * and should not leak out of it (through Result objects, etc)
+   *
+   * For replication serially, there are two column families "rep_barrier", "rep_position" whose
+   * row key is encodedRegionName.
+   * rep_barrier:{seqid}      => in each time a RS opens a region, it saves the open sequence
+   *                                  id in this region
+   * rep_position:{peerid}    => to save the max sequence id we have pushed for each peer
+   * rep_position:_TABLENAME_ => a special cell to save this region's table name, will used when
+   *                             we clean old data
+   * rep_position:_DAUGHTER_  => a special cell to present this region is split or merged, in this
+   *                             cell the value is merged encoded name or two split encoded names
+   *                             separated by ","
    */
 
   private static final Log LOG = LogFactory.getLog(MetaTableAccessor.class);
   private static final Log METALOG = LogFactory.getLog("org.apache.hadoop.hbase.META");
 
+  // Save its daughter region(s) when split/merge
+  private static final byte[] daughterNamePosCq = Bytes.toBytes("_DAUGHTER_");
+
+  // Save its table name because we only know region's encoded name
+  private static final String tableNamePeer = "_TABLENAME_";
+  private static final byte[] tableNamePosCq = Bytes.toBytes(tableNamePeer);
+
   static final byte [] META_REGION_PREFIX;
   static {
     // Copy the prefix from FIRST_META_REGIONINFO into META_REGION_PREFIX.
@@ -1318,6 +1337,19 @@ public class MetaTableAccessor {
     return delete;
   }
 
+  public static Put makeBarrierPut(byte[] encodedRegionName, long seq, byte[] tableName) {
+    byte[] seqBytes = Bytes.toBytes(seq);
+    return new Put(encodedRegionName)
+        .addImmutable(HConstants.REPLICATION_BARRIER_FAMILY, seqBytes, seqBytes)
+        .addImmutable(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq, tableName);
+  }
+
+
+  public static Put makeSerialDaughterPut(byte[] encodedRegionName, byte[] value) {
+    return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_POSITION_FAMILY,
+        daughterNamePosCq, value);
+  }
+
   /**
    * Adds split daughters to the Put
    */
@@ -1334,27 +1366,28 @@ public class MetaTableAccessor {
   }
 
   /**
-   * Put the passed <code>p</code> to the <code>hbase:meta</code> table.
+   * Put the passed <code>puts</code> to the <code>hbase:meta</code> table.
+   * Non-atomic for multi puts.
    * @param connection connection we're using
-   * @param p Put to add to hbase:meta
+   * @param puts Put to add to hbase:meta
    * @throws IOException
    */
-  static void putToMetaTable(final Connection connection, final Put p)
+  static void putToMetaTable(final Connection connection, final Put... puts)
     throws IOException {
-    put(getMetaHTable(connection), p);
+    put(getMetaHTable(connection), Arrays.asList(puts));
   }
 
   /**
    * @param t Table to use (will be closed when done).
-   * @param p put to make
+   * @param puts puts to make
    * @throws IOException
    */
-  private static void put(final Table t, final Put p) throws IOException {
+  private static void put(final Table t, final List<Put> puts) throws IOException {
     try {
       if (METALOG.isDebugEnabled()) {
-        METALOG.debug(mutationToString(p));
+        METALOG.debug(mutationsToString(puts));
       }
-      t.put(p);
+      t.put(puts);
     } finally {
       t.close();
     }
@@ -1490,7 +1523,7 @@ public class MetaTableAccessor {
    * Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
    * does not add its daughter's as different rows, but adds information about the daughters
    * in the same row as the parent. Use
-   * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)}
+   * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)}
    * if you want to do that.
    * @param meta the Table for META
    * @param regionInfo region information
@@ -1515,7 +1548,7 @@ public class MetaTableAccessor {
    * Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
    * does not add its daughter's as different rows, but adds information about the daughters
    * in the same row as the parent. Use
-   * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)}
+   * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)}
    * if you want to do that.
    * @param connection connection we're using
    * @param regionInfo region information
@@ -1601,11 +1634,12 @@ public class MetaTableAccessor {
    * @param regionB
    * @param sn the location of the region
    * @param masterSystemTime
+   * @param saveBarrier true if need save replication barrier in meta, used for serial replication
    * @throws IOException
    */
   public static void mergeRegions(final Connection connection, HRegionInfo mergedRegion,
       HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int regionReplication,
-      long masterSystemTime)
+      long masterSystemTime, boolean saveBarrier)
           throws IOException {
     Table meta = getMetaHTable(connection);
     try {
@@ -1636,7 +1670,17 @@ public class MetaTableAccessor {
 
       byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString()
         + HConstants.DELIMITER);
-      multiMutate(meta, tableRow, putOfMerged, deleteA, deleteB);
+      Mutation[] mutations;
+      if (saveBarrier) {
+        Put putBarrierA = makeSerialDaughterPut(regionA.getEncodedNameAsBytes(),
+            Bytes.toBytes(mergedRegion.getEncodedName()));
+        Put putBarrierB = makeSerialDaughterPut(regionB.getEncodedNameAsBytes(),
+            Bytes.toBytes(mergedRegion.getEncodedName()));
+        mutations = new Mutation[] { putOfMerged, deleteA, deleteB, putBarrierA, putBarrierB };
+      } else {
+        mutations = new Mutation[] { putOfMerged, deleteA, deleteB };
+      }
+      multiMutate(meta, tableRow, mutations);
     } finally {
       meta.close();
     }
@@ -1652,10 +1696,11 @@ public class MetaTableAccessor {
    * @param splitA Split daughter region A
    * @param splitB Split daughter region A
    * @param sn the location of the region
+   * @param saveBarrier true if need save replication barrier in meta, used for serial replication
    */
-  public static void splitRegion(final Connection connection,
-                                 HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
-                                 ServerName sn, int regionReplication) throws IOException {
+  public static void splitRegion(final Connection connection, HRegionInfo parent,
+      HRegionInfo splitA, HRegionInfo splitB, ServerName sn, int regionReplication,
+      boolean saveBarrier) throws IOException {
     Table meta = getMetaHTable(connection);
     try {
       HRegionInfo copyOfParent = new HRegionInfo(parent);
@@ -1680,8 +1725,17 @@ public class MetaTableAccessor {
         addEmptyLocation(putB, i);
       }
 
+      Mutation[] mutations;
+      if (saveBarrier) {
+        Put putBarrier = makeSerialDaughterPut(parent.getEncodedNameAsBytes(),
+            Bytes
+                .toBytes(splitA.getEncodedName() + HConstants.DELIMITER + splitB.getEncodedName()));
+        mutations = new Mutation[]{putParent, putA, putB, putBarrier};
+      } else {
+        mutations = new Mutation[]{putParent, putA, putB};
+      }
       byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER);
-      multiMutate(meta, tableRow, putParent, putA, putB);
+      multiMutate(meta, tableRow, mutations);
     } finally {
       meta.close();
     }
@@ -1781,6 +1835,27 @@ public class MetaTableAccessor {
   }
 
   /**
+   * Updates the progress of pushing entries to peer cluster. Skip entry if value is -1.
+   * @param connection connection we're using
+   * @param peerId the peerId to push
+   * @param positions map that saving positions for each region
+   * @throws IOException
+   */
+  public static void updateReplicationPositions(Connection connection, String peerId,
+      Map<String, Long> positions) throws IOException {
+    List<Put> puts = new ArrayList<>();
+    for (Map.Entry<String, Long> entry : positions.entrySet()) {
+      long value = Math.abs(entry.getValue());
+      Put put = new Put(Bytes.toBytes(entry.getKey()));
+      put.addImmutable(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId),
+          Bytes.toBytes(value));
+      puts.add(put);
+    }
+    getMetaHTable(connection).put(puts);
+  }
+
+
+  /**
    * Updates the location of the specified region to be the specified server.
    * <p>
    * Connects to the specified server which should be hosting the specified
@@ -1977,4 +2052,125 @@ public class MetaTableAccessor {
   private static String mutationToString(Mutation p) throws IOException {
     return p.getClass().getSimpleName() + p.toJSON();
   }
+
+  /**
+   * Get replication position for a peer in a region.
+   * @param connection connection we're using
+   * @return the position of this peer, -1 if no position in meta.
+   */
+  public static long getReplicationPositionForOnePeer(Connection connection,
+      byte[] encodedRegionName, String peerId) throws IOException {
+    Get get = new Get(encodedRegionName);
+    get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId));
+    Result r = get(getMetaHTable(connection), get);
+    if (r.isEmpty()) {
+      return -1;
+    }
+    Cell cell = r.rawCells()[0];
+    return Bytes.toLong(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
+  }
+
+  /**
+   * Get replication positions for all peers in a region.
+   * @param connection connection we're using
+   * @param encodedRegionName region's encoded name
+   * @return the map of positions for each peer
+   */
+  public static Map<String, Long> getReplicationPositionForAllPeer(Connection connection,
+      byte[] encodedRegionName) throws IOException {
+    Get get = new Get(encodedRegionName);
+    get.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
+    Result r = get(getMetaHTable(connection), get);
+    Map<String, Long> map = new HashMap<>((int) (r.size() / 0.75 + 1));
+    for (Cell c : r.listCells()) {
+      if (!Bytes.equals(tableNamePosCq, 0, tableNamePosCq.length, c.getQualifierArray(),
+          c.getQualifierOffset(), c.getQualifierLength()) &&
+          !Bytes.equals(daughterNamePosCq, 0, daughterNamePosCq.length, c.getQualifierArray(),
+          c.getQualifierOffset(), c.getQualifierLength())) {
+        map.put(
+            Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()),
+            Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
+      }
+    }
+    return map;
+  }
+
+  /**
+   * Get replication barriers for all peers in a region.
+   * @param encodedRegionName region's encoded name
+   * @return a list of barrier sequence numbers.
+   * @throws IOException
+   */
+  public static List<Long> getReplicationBarriers(Connection connection, byte[] encodedRegionName)
+      throws IOException {
+    Get get = new Get(encodedRegionName);
+    get.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
+    Result r = get(getMetaHTable(connection), get);
+    List<Long> list = new ArrayList<>();
+    if (!r.isEmpty()) {
+      for (Cell cell : r.rawCells()) {
+        list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
+            cell.getQualifierLength()));
+      }
+    }
+    return list;
+  }
+
+  /**
+   * Get all barriers in all regions.
+   * @return a map of barrier lists in all regions
+   * @throws IOException
+   */
+  public static Map<String, List<Long>> getAllBarriers(Connection connection) throws IOException {
+    Map<String, List<Long>> map = new HashMap<>();
+    Scan scan = new Scan();
+    scan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
+    try (Table t = getMetaHTable(connection);
+        ResultScanner scanner = t.getScanner(scan)) {
+      Result result;
+      while ((result = scanner.next()) != null) {
+        String key = Bytes.toString(result.getRow());
+        List<Long> list = new ArrayList<>();
+        for (Cell cell : result.rawCells()) {
+          list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
+              cell.getQualifierLength()));
+        }
+        map.put(key, list);
+      }
+    }
+    return map;
+  }
+
+  /**
+   * Get daughter region(s) for a region, only used in serial replication.
+   * @throws IOException
+   */
+  public static String getSerialReplicationDaughterRegion(Connection connection, byte[] encodedName)
+      throws IOException {
+    Get get = new Get(encodedName);
+    get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, daughterNamePosCq);
+    Result result = get(getMetaHTable(connection), get);
+    if (!result.isEmpty()) {
+      Cell c = result.rawCells()[0];
+      return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+    }
+    return null;
+  }
+
+  /**
+   * Get the table name for a region, only used in serial replication.
+   * @throws IOException
+   */
+  public static String getSerialReplicationTableName(Connection connection, byte[] encodedName)
+      throws IOException {
+    Get get = new Get(encodedName);
+    get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq);
+    Result result = get(getMetaHTable(connection), get);
+    if (!result.isEmpty()) {
+      Cell c = result.rawCells()[0];
+      return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+    }
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 04edd25..37c62c5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -721,6 +721,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     Scan s = new Scan();
     s.setReversed(true);
     s.setStartRow(metaKey);
+    s.addFamily(HConstants.CATALOG_FAMILY);
     s.setSmall(true);
     s.setCaching(1);
     if (this.useMetaReplicas) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index dca1821..ee26e38 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -92,8 +92,10 @@ public class ReplicationAdmin implements Closeable {
   // only Global for now, can add other type
   // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
   public static final String REPLICATIONTYPE = "replicationType";
-  public static final String REPLICATIONGLOBAL = Integer
-      .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
+  public static final String REPLICATIONGLOBAL =
+      Integer.toString(HConstants.REPLICATION_SCOPE_GLOBAL);
+  public static final String REPLICATIONSERIAL =
+      Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL);
 
   private final Connection connection;
   // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
@@ -430,7 +432,10 @@ public class ReplicationAdmin implements Closeable {
           HashMap<String, String> replicationEntry = new HashMap<String, String>();
           replicationEntry.put(TNAME, tableName);
           replicationEntry.put(CFNAME, column.getNameAsString());
-          replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
+          replicationEntry.put(REPLICATIONTYPE,
+              column.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL ?
+                  REPLICATIONGLOBAL :
+                  REPLICATIONSERIAL);
           replicationColFams.add(replicationEntry);
         }
       }
@@ -616,7 +621,8 @@ public class ReplicationAdmin implements Closeable {
    */
   private boolean isTableRepEnabled(HTableDescriptor htd) {
     for (HColumnDescriptor hcd : htd.getFamilies()) {
-      if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
+      if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
+          && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index ce18ef5..4c499a2 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -429,6 +429,20 @@ public final class HConstants {
   /** The catalog family */
   public static final byte [] CATALOG_FAMILY = Bytes.toBytes(CATALOG_FAMILY_STR);
 
+  /** The replication barrier family as a string*/
+  public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier";
+
+  /** The replication barrier family */
+  public static final byte [] REPLICATION_BARRIER_FAMILY =
+      Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR);
+
+  /** The replication barrier family as a string*/
+  public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position";
+
+  /** The replication barrier family */
+  public static final byte [] REPLICATION_POSITION_FAMILY =
+      Bytes.toBytes(REPLICATION_POSITION_FAMILY_STR);
+
   /** The RegionInfo qualifier as a string */
   public static final String REGIONINFO_QUALIFIER_STR = "regioninfo";
 
@@ -636,6 +650,12 @@ public final class HConstants {
   public static final int REPLICATION_SCOPE_GLOBAL = 1;
 
   /**
+   * Scope tag for serially scoped data
+   * This data will be replicated to all peers by the order of sequence id.
+   */
+  public static final int REPLICATION_SCOPE_SERIAL = 2;
+
+  /**
    * Default cluster ID, cannot be used to identify a cluster so a key with
    * this value means it wasn't meant for replication.
    */
@@ -866,6 +886,12 @@ public final class HConstants {
   public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
   /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
   public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id";
+
+  public static final String
+      REPLICATION_SERIALLY_WAITING_KEY = "hbase.serial.replication.waitingMs";
+  public static final long
+      REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
+
   /**
    * Directory where the source cluster file system client configuration are placed which is used by
    * sink cluster to copy HFiles from source cluster file system

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 116c7d9..a791717 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1571,6 +1571,20 @@ possible configurations would overwhelm and obscure the important.
         slave clusters. The default of 10 will rarely need to be changed.
     </description>
   </property>
+  <property>
+    <name>hbase.serial.replication.waitingMs</name>
+    <value>10000</value>
+    <description>
+      By default, in replication we can not make sure the order of operations in slave cluster is
+      same as the order in master. If set REPLICATION_SCOPE to 2, we will push edits by the order
+      of written. This configure is to set how long (in ms) we will wait before next checking if a
+      log can not push right now because there are some logs written before it have not been pushed.
+      A larger waiting will decrease the number of queries on hbase:meta but will enlarge the delay
+      of replication. This feature relies on zk-less assignment, and conflicts with distributed log
+      replay. So users must set hbase.assignment.usezk and hbase.master.distributed.log.replay to
+      false to support it.
+    </description>
+  </property>
   <!-- Static Web User Filter properties. -->
   <property>
     <description>

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index 28f4d4b..a675b12 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -21,6 +21,10 @@ public final class WALProtos {
      * <code>REPLICATION_SCOPE_GLOBAL = 1;</code>
      */
     REPLICATION_SCOPE_GLOBAL(1, 1),
+    /**
+     * <code>REPLICATION_SCOPE_SERIAL = 2;</code>
+     */
+    REPLICATION_SCOPE_SERIAL(2, 2),
     ;
 
     /**
@@ -31,6 +35,10 @@ public final class WALProtos {
      * <code>REPLICATION_SCOPE_GLOBAL = 1;</code>
      */
     public static final int REPLICATION_SCOPE_GLOBAL_VALUE = 1;
+    /**
+     * <code>REPLICATION_SCOPE_SERIAL = 2;</code>
+     */
+    public static final int REPLICATION_SCOPE_SERIAL_VALUE = 2;
 
 
     public final int getNumber() { return value; }
@@ -39,6 +47,7 @@ public final class WALProtos {
       switch (value) {
         case 0: return REPLICATION_SCOPE_LOCAL;
         case 1: return REPLICATION_SCOPE_GLOBAL;
+        case 2: return REPLICATION_SCOPE_SERIAL;
         default: return null;
       }
     }
@@ -12013,11 +12022,12 @@ public final class WALProtos {
       "\030\005 \003(\0132\031.hbase.pb.StoreDescriptor\022$\n\006ser" +
       "ver\030\006 \001(\0132\024.hbase.pb.ServerName\022\023\n\013regio" +
       "n_name\030\007 \001(\014\".\n\tEventType\022\017\n\013REGION_OPEN" +
-      "\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWALTrailer*F\n\tSc" +
+      "\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWALTrailer*d\n\tSc" +
       "opeType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030" +
-      "REPLICATION_SCOPE_GLOBAL\020\001B?\n*org.apache" +
-      ".hadoop.hbase.protobuf.generatedB\tWALPro" +
-      "tosH\001\210\001\000\240\001\001"
+      "REPLICATION_SCOPE_GLOBAL\020\001\022\034\n\030REPLICATIO" +
+      "N_SCOPE_SERIAL\020\002B?\n*org.apache.hadoop.hb" +
+      "ase.protobuf.generatedB\tWALProtosH\001\210\001\000\240\001" +
+      "\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-protocol/src/main/protobuf/WAL.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto
index c1d465a..2494977 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -75,6 +75,7 @@ message WALKey {
 enum ScopeType {
   REPLICATION_SCOPE_LOCAL = 0;
   REPLICATION_SCOPE_GLOBAL = 1;
+  REPLICATION_SCOPE_SERIAL = 2;
 }
 
 message FamilyScope {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 5ce056d..2022c5e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -98,6 +98,7 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
+import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
 import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
@@ -311,6 +312,7 @@ public class HMaster extends HRegionServer implements MasterServices {
 
   CatalogJanitor catalogJanitorChore;
   private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
+  private ReplicationMetaCleaner replicationMetaCleaner;
   private LogCleaner logCleaner;
   private HFileCleaner hfileCleaner;
   private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
@@ -988,6 +990,8 @@ public class HMaster extends HRegionServer implements MasterServices {
         LOG.error("start replicationZKLockCleanerChore failed", e);
       }
     }
+    replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
+    getChoreService().scheduleChore(replicationMetaCleaner);
   }
 
   @Override
@@ -1022,6 +1026,7 @@ public class HMaster extends HRegionServer implements MasterServices {
     if (this.logCleaner != null) this.logCleaner.cancel(true);
     if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
     if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true);
+    if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
     if (this.quotaManager != null) this.quotaManager.stop();
     if (this.activeMasterManager != null) this.activeMasterManager.stop();
     if (this.serverManager != null) this.serverManager.stop();

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
index 82e28df..2dbc087 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
@@ -17,22 +17,25 @@
  */
 package org.apache.hadoop.hbase.master;
 
+import com.google.common.base.Preconditions;
+
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.master.RegionState.State;
@@ -44,8 +47,6 @@ import org.apache.hadoop.hbase.util.MultiHConnection;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.zookeeper.KeeperException;
 
-import com.google.common.base.Preconditions;
-
 /**
  * A helper to persist region state in meta. We may change this class
  * to StateStore later if we also use it to store other states in meta
@@ -60,7 +61,7 @@ public class RegionStateStore {
   private volatile Region metaRegion;
   private volatile boolean initialized;
   private MultiHConnection multiHConnection;
-  private final Server server;
+  private final MasterServices server;
 
   /**
    * Returns the {@link ServerName} from catalog table {@link Result}
@@ -130,7 +131,7 @@ public class RegionStateStore {
           State.SPLITTING_NEW, State.MERGED));
   }
 
-  RegionStateStore(final Server server) {
+  RegionStateStore(final MasterServices server) {
     this.server = server;
     initialized = false;
   }
@@ -187,31 +188,41 @@ public class RegionStateStore {
       State state = newState.getState();
 
       int replicaId = hri.getReplicaId();
-      Put put = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
+      Put metaPut = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
       StringBuilder info = new StringBuilder("Updating hbase:meta row ");
       info.append(hri.getRegionNameAsString()).append(" with state=").append(state);
       if (serverName != null && !serverName.equals(oldServer)) {
-        put.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
+        metaPut.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
           Bytes.toBytes(serverName.getServerName()));
         info.append(", sn=").append(serverName);
       }
       if (openSeqNum >= 0) {
         Preconditions.checkArgument(state == State.OPEN
           && serverName != null, "Open region should be on a server");
-        MetaTableAccessor.addLocation(put, serverName, openSeqNum, -1, replicaId);
+        MetaTableAccessor.addLocation(metaPut, serverName, openSeqNum, -1, replicaId);
         info.append(", openSeqNum=").append(openSeqNum);
         info.append(", server=").append(serverName);
       }
-      put.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
+      metaPut.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
         Bytes.toBytes(state.name()));
       LOG.info(info);
-
+      HTableDescriptor descriptor = server.getTableDescriptors().get(hri.getTable());
+      boolean serial = false;
+      if (descriptor != null) {
+        serial = server.getTableDescriptors().get(hri.getTable()).hasSerialReplicationScope();
+      }
+      boolean shouldPutBarrier = serial && state == State.OPEN;
       // Persist the state change to meta
       if (metaRegion != null) {
         try {
           // Assume meta is pinned to master.
           // At least, that's what we want.
-          metaRegion.put(put);
+          metaRegion.put(metaPut);
+          if (shouldPutBarrier) {
+            Put barrierPut = MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
+                openSeqNum, hri.getTable().getName());
+            metaRegion.put(barrierPut);
+          }
           return; // Done here
         } catch (Throwable t) {
           // In unit tests, meta could be moved away by intention
@@ -230,8 +241,10 @@ public class RegionStateStore {
         }
       }
       // Called when meta is not on master
-      multiHConnection.processBatchCallback(Arrays.asList(put),
-          TableName.META_TABLE_NAME, null, null);
+      List<Put> list = shouldPutBarrier ?
+          Arrays.asList(metaPut, MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
+              openSeqNum, hri.getTable().getName())) : Arrays.asList(metaPut);
+      multiHConnection.processBatchCallback(list, TableName.META_TABLE_NAME, null, null);
 
     } catch (IOException ioe) {
       LOG.error("Failed to persist region state " + newState, ioe);
@@ -241,12 +254,14 @@ public class RegionStateStore {
 
   void splitRegion(HRegionInfo p,
       HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
-    MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication);
+    MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication,
+        server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
   }
 
   void mergeRegions(HRegionInfo p,
       HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
     MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication,
-    		EnvironmentEdgeManager.currentTime());
+        EnvironmentEdgeManager.currentTime(),
+        server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
new file mode 100644
index 0000000..e9647e8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This chore is to clean up the useless data in hbase:meta which is used by serial replication.
+ */
+@InterfaceAudience.Private
+public class ReplicationMetaCleaner extends ScheduledChore {
+
+  private static final Log LOG = LogFactory.getLog(ReplicationMetaCleaner.class);
+
+  private ReplicationAdmin replicationAdmin;
+  private MasterServices master;
+
+  public ReplicationMetaCleaner(MasterServices master, Stoppable stoppable, int period)
+      throws IOException {
+    super("ReplicationMetaCleaner", stoppable, period);
+    this.master = master;
+    replicationAdmin = new ReplicationAdmin(master.getConfiguration());
+  }
+
+  @Override
+  protected void chore() {
+    try {
+      Map<String, HTableDescriptor> tables = master.getTableDescriptors().getAllDescriptors();
+      Map<String, Set<String>> serialTables = new HashMap<>();
+      for (Map.Entry<String, HTableDescriptor> entry : tables.entrySet()) {
+        boolean hasSerialScope = false;
+        for (HColumnDescriptor column : entry.getValue().getFamilies()) {
+          if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL) {
+            hasSerialScope = true;
+            break;
+          }
+        }
+        if (hasSerialScope) {
+          serialTables.put(entry.getValue().getTableName().getNameAsString(), new HashSet<String>());
+        }
+      }
+      if (serialTables.isEmpty()){
+        return;
+      }
+
+      Map<String, ReplicationPeerConfig> peers = replicationAdmin.listPeerConfigs();
+      for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
+        for (Map.Entry<TableName, List<String>> map : entry.getValue().getTableCFsMap()
+            .entrySet()) {
+          if (serialTables.containsKey(map.getKey().getNameAsString())) {
+            serialTables.get(map.getKey().getNameAsString()).add(entry.getKey());
+            break;
+          }
+        }
+      }
+
+      Map<String, List<Long>> barrierMap = MetaTableAccessor.getAllBarriers(master.getConnection());
+      for (Map.Entry<String, List<Long>> entry : barrierMap.entrySet()) {
+        String encodedName = entry.getKey();
+        byte[] encodedBytes = Bytes.toBytes(encodedName);
+        boolean canClearRegion = false;
+        Map<String, Long> posMap = MetaTableAccessor.getReplicationPositionForAllPeer(
+            master.getConnection(), encodedBytes);
+        if (posMap.isEmpty()) {
+          continue;
+        }
+
+        String tableName = MetaTableAccessor.getSerialReplicationTableName(
+            master.getConnection(), encodedBytes);
+        Set<String> confPeers = serialTables.get(tableName);
+        if (confPeers == null) {
+          // This table doesn't exist or all cf's scope is not serial any more, we can clear meta.
+          canClearRegion = true;
+        } else {
+          if (!allPeersHavePosition(confPeers, posMap)) {
+            continue;
+          }
+
+          String daughterValue = MetaTableAccessor
+              .getSerialReplicationDaughterRegion(master.getConnection(), encodedBytes);
+          if (daughterValue != null) {
+            //this region is merged or split
+            boolean allDaughterStart = true;
+            String[] daughterRegions = daughterValue.split(",");
+            for (String daughter : daughterRegions) {
+              byte[] region = Bytes.toBytes(daughter);
+              if (!MetaTableAccessor.getReplicationBarriers(
+                  master.getConnection(), region).isEmpty() &&
+                  !allPeersHavePosition(confPeers,
+                      MetaTableAccessor
+                          .getReplicationPositionForAllPeer(master.getConnection(), region))) {
+                allDaughterStart = false;
+                break;
+              }
+            }
+            if (allDaughterStart) {
+              canClearRegion = true;
+            }
+          }
+        }
+        if (canClearRegion) {
+          Delete delete = new Delete(encodedBytes);
+          delete.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
+          delete.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
+          try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
+            metaTable.delete(delete);
+          }
+        } else {
+
+          // Barriers whose seq is larger than min pos of all peers, and the last barrier whose seq
+          // is smaller than min pos should be kept. All other barriers can be deleted.
+
+          long minPos = Long.MAX_VALUE;
+          for (Map.Entry<String, Long> pos : posMap.entrySet()) {
+            minPos = Math.min(minPos, pos.getValue());
+          }
+          List<Long> barriers = entry.getValue();
+          int index = Collections.binarySearch(barriers, minPos);
+          if (index < 0) {
+            index = -index - 1;
+          }
+          Delete delete = new Delete(encodedBytes);
+          for (int i = 0; i < index - 1; i++) {
+            delete.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, Bytes.toBytes(barriers.get(i)));
+          }
+          try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
+            metaTable.delete(delete);
+          }
+        }
+
+      }
+
+    } catch (IOException e) {
+      LOG.error("Exception during cleaning up.", e);
+    }
+
+  }
+
+  private boolean allPeersHavePosition(Set<String> peers, Map<String, Long> posMap)
+      throws IOException {
+    for(String peer:peers){
+      if (!posMap.containsKey(peer)){
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 3449832..72474a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -125,7 +125,6 @@ class FSWALEntry extends Entry {
         CellUtil.setSequenceId(c, regionSequenceId);
       }
     }
-
     getKey().setWriteEntry(we);
     return regionSequenceId;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 4f518bb..741065a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
@@ -279,6 +280,17 @@ public class Replication extends WALActionsListener.Base implements
     for (Cell cell : logEdit.getCells()) {
       if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
         foundOtherEdits = true;
+        break;
+      }
+    }
+
+    if (!foundOtherEdits && logEdit.getCells().size() > 0) {
+      WALProtos.RegionEventDescriptor maybeEvent =
+          WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0));
+      if (maybeEvent != null && (maybeEvent.getEventType() ==
+          WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
+        // In serially replication, we use scopes when reading close marker.
+        foundOtherEdits = true;
       }
     }
     if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 2f3b2a8..ce0fb06 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -18,6 +18,10 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Service;
@@ -29,8 +33,10 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -48,9 +54,11 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -102,6 +110,8 @@ public class ReplicationSource extends Thread
   private ReplicationQueueInfo replicationQueueInfo;
   // id of the peer cluster this source replicates to
   private String peerId;
+
+  String actualPeerId;
   // The manager of all sources to which we ping back our progress
   private ReplicationSourceManager manager;
   // Should we stop everything?
@@ -185,6 +195,8 @@ public class ReplicationSource extends Thread
     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
     // ReplicationQueueInfo parses the peerId out of the znode for us
     this.peerId = this.replicationQueueInfo.getPeerId();
+    ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+    this.actualPeerId = replicationQueueInfo.getPeerId();
     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
     this.replicationEndpoint = replicationEndpoint;
   }
@@ -507,6 +519,17 @@ public class ReplicationSource extends Thread
     // Current number of hfiles that we need to replicate
     private long currentNbHFiles = 0;
 
+    // Use guava cache to set ttl for each key
+    private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
+        .expireAfterAccess(1, TimeUnit.DAYS).build(
+        new CacheLoader<String, Boolean>() {
+          @Override
+          public Boolean load(String key) throws Exception {
+            return false;
+          }
+        }
+    );
+
     public ReplicationSourceWorkerThread(String walGroupId,
         PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
         ReplicationSource source) {
@@ -588,9 +611,24 @@ public class ReplicationSource extends Thread
         currentNbOperations = 0;
         currentNbHFiles = 0;
         List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
+
+        Map<String, Long> lastPositionsForSerialScope = new HashMap<>();
         currentSize = 0;
         try {
-          if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
+          if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries,
+              lastPositionsForSerialScope)) {
+            for (Map.Entry<String, Long> entry : lastPositionsForSerialScope.entrySet()) {
+              waitingUntilCanPush(entry);
+            }
+            try {
+              MetaTableAccessor
+                  .updateReplicationPositions(manager.getConnection(), actualPeerId,
+                      lastPositionsForSerialScope);
+            } catch (IOException e) {
+              LOG.error("updateReplicationPositions fail", e);
+              stopper.stop("updateReplicationPositions fail");
+            }
+
             continue;
           }
         } catch (IOException ioe) {
@@ -626,15 +664,30 @@ public class ReplicationSource extends Thread
             LOG.warn("Unable to finalize the tailing of a file", e);
           }
         }
-
+        for(Map.Entry<String, Long> entry: lastPositionsForSerialScope.entrySet()) {
+          waitingUntilCanPush(entry);
+        }
         // If we didn't get anything to replicate, or if we hit a IOE,
         // wait a bit and retry.
         // But if we need to stop, don't bother sleeping
         if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
           if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
-            manager.logPositionAndCleanOldLogs(this.currentPath,
-                peerClusterZnode, this.repLogReader.getPosition(),
+
+            // Save positions to meta table before zk.
+            if (!gotIOE) {
+              try {
+                MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
+                    lastPositionsForSerialScope);
+              } catch (IOException e) {
+                LOG.error("updateReplicationPositions fail", e);
+                stopper.stop("updateReplicationPositions fail");
+              }
+            }
+
+            manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
+                this.repLogReader.getPosition(),
                 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
+
             this.lastLoggedPosition = this.repLogReader.getPosition();
           }
           // Reset the sleep multiplier if nothing has actually gone wrong
@@ -649,8 +702,7 @@ public class ReplicationSource extends Thread
           }
           continue;
         }
-        sleepMultiplier = 1;
-        shipEdits(currentWALisBeingWrittenTo, entries);
+        shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope);
       }
       if (replicationQueueInfo.isQueueRecovered()) {
         // use synchronize to make sure one last thread will clean the queue
@@ -672,16 +724,42 @@ public class ReplicationSource extends Thread
       }
     }
 
+    private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
+      String key = entry.getKey();
+      long seq = entry.getValue();
+      boolean deleteKey = false;
+      if (seq <= 0) {
+        // There is a REGION_CLOSE marker, we can not continue skipping after this entry.
+        deleteKey = true;
+        seq = -seq;
+      }
+
+      if (!canSkipWaitingSet.getUnchecked(key)) {
+        try {
+          manager.waitUntilCanBePushed(Bytes.toBytes(key), seq, actualPeerId);
+        } catch (Exception e) {
+          LOG.error("waitUntilCanBePushed fail", e);
+          stopper.stop("waitUntilCanBePushed fail");
+        }
+        canSkipWaitingSet.put(key, true);
+      }
+      if (deleteKey) {
+        canSkipWaitingSet.invalidate(key);
+      }
+    }
+
     /**
      * Read all the entries from the current log files and retain those that need to be replicated.
      * Else, process the end of the current file.
      * @param currentWALisBeingWrittenTo is the current WAL being written to
      * @param entries resulting entries to be replicated
+     * @param lastPosition save the last sequenceid for each region if the table has
+     *                     serial-replication scope
      * @return true if we got nothing and went to the next file, false if we got entries
      * @throws IOException
      */
     protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
-        List<WAL.Entry> entries) throws IOException {
+        List<WAL.Entry> entries, Map<String, Long> lastPosition) throws IOException {
       long seenEntries = 0;
       if (LOG.isTraceEnabled()) {
         LOG.trace("Seeking in " + this.currentPath + " at position "
@@ -694,6 +772,27 @@ public class ReplicationSource extends Thread
         metrics.incrLogEditsRead();
         seenEntries++;
 
+        if (entry.hasSerialReplicationScope()) {
+          String key = Bytes.toString(entry.getKey().getEncodedRegionName());
+          lastPosition.put(key, entry.getKey().getSequenceId());
+          if (entry.getEdit().getCells().size() > 0) {
+            WALProtos.RegionEventDescriptor maybeEvent =
+                WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
+            if (maybeEvent != null && maybeEvent.getEventType()
+                == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
+              // In serially replication, if we move a region to another RS and move it back, we may
+              // read logs crossing two sections. We should break at REGION_CLOSE and push the first
+              // section first in case of missing the middle section belonging to the other RS.
+              // In a worker thread, if we can push the first log of a region, we can push all logs
+              // in the same region without waiting until we read a close marker because next time
+              // we read logs in this region, it must be a new section and not adjacent with this
+              // region. Mark it negative.
+              lastPosition.put(key, -entry.getKey().getSequenceId());
+              break;
+            }
+          }
+        }
+
         // don't replicate if the log entries have already been consumed by the cluster
         if (replicationEndpoint.canReplicateToSameCluster()
             || !entry.getKey().getClusterIds().contains(peerClusterId)) {
@@ -723,6 +822,7 @@ public class ReplicationSource extends Thread
             || entries.size() >= replicationQueueNbCapacity) {
           break;
         }
+
         try {
           entry = this.repLogReader.readNextAndSetPosition();
         } catch (IOException ie) {
@@ -995,7 +1095,8 @@ public class ReplicationSource extends Thread
      * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
      * written to when this method was called
      */
-    protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
+    protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries,
+        Map<String, Long> lastPositionsForSerialScope) {
       int sleepMultiplier = 0;
       if (entries.isEmpty()) {
         LOG.warn("Was given 0 edits to ship");
@@ -1046,6 +1147,16 @@ public class ReplicationSource extends Thread
             for (int i = 0; i < size; i++) {
               cleanUpHFileRefs(entries.get(i).getEdit());
             }
+
+            // Save positions to meta table before zk.
+            try {
+              MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
+                  lastPositionsForSerialScope);
+            } catch (IOException e) {
+              LOG.error("updateReplicationPositions fail", e);
+              stopper.stop("updateReplicationPositions fail");
+            }
+
             //Log and clean up WAL logs
             manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
               this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 3cb7a84..a6f1891 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -48,10 +49,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@@ -64,6 +68,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 
 /**
@@ -118,6 +123,8 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final Random rand;
   private final boolean replicationForBulkLoadDataEnabled;
 
+  private Connection connection;
+  private long replicationWaitTime;
 
   /**
    * Creates a replication manager and sets the watch on all the other registered region servers
@@ -134,7 +141,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
       final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
-      final Path oldLogDir, final UUID clusterId) {
+      final Path oldLogDir, final UUID clusterId) throws IOException {
     //CopyOnWriteArrayList is thread-safe.
     //Generally, reading is more than modifying.
     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
@@ -171,6 +178,9 @@ public class ReplicationSourceManager implements ReplicationListener {
     replicationForBulkLoadDataEnabled =
         conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
           HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+    this.replicationWaitTime = conf.getLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY,
+          HConstants.REPLICATION_SERIALLY_WAITING_DEFAULT);
+    connection = ConnectionFactory.createConnection(conf);
   }
 
   /**
@@ -782,6 +792,10 @@ public class ReplicationSourceManager implements ReplicationListener {
     return this.fs;
   }
 
+  public Connection getConnection() {
+    return this.connection;
+  }
+
   /**
    * Get the ReplicationPeers used by this ReplicationSourceManager
    * @return the ReplicationPeers used by this ReplicationSourceManager
@@ -814,4 +828,75 @@ public class ReplicationSourceManager implements ReplicationListener {
   public void cleanUpHFileRefs(String peerId, List<String> files) {
     this.replicationQueues.removeHFileRefs(peerId, files);
   }
+
+  /**
+   * Whether an entry can be pushed to the peer or not right now.
+   * If we enable serial replication, we can not push the entry until all entries in its region
+   * whose sequence numbers are smaller than this entry have been pushed.
+   * For each ReplicationSource, we need only check the first entry in each region, as long as it
+   * can be pushed, we can push all in this ReplicationSource.
+   * This method will be blocked until we can push.
+   * @return the first barrier of entry's region, or -1 if there is no barrier. It is used to
+   *         prevent saving positions in the region of no barrier.
+   */
+  void waitUntilCanBePushed(byte[] encodedName, long seq, String peerId)
+      throws IOException, InterruptedException {
+
+    /**
+     * There are barriers for this region and position for this peer. N barriers form N intervals,
+     * (b1,b2) (b2,b3) ... (bn,max). Generally, there is no logs whose seq id is not greater than
+     * the first barrier and the last interval is start from the last barrier.
+     *
+     * There are several conditions that we can push now, otherwise we should block:
+     * 1) "Serial replication" is not enabled, we can push all logs just like before. This case
+     *    should not call this method.
+     * 2) There is no barriers for this region, or the seq id is smaller than the first barrier.
+     *    It is mainly because we alter REPLICATION_SCOPE = 2. We can not guarantee the
+     *    order of logs that is written before altering.
+     * 3) This entry is in the first interval of barriers. We can push them because it is the
+     *    start of a region. Splitting/merging regions are also ok because the first section of
+     *    daughter region is in same region of parents and the order in one RS is guaranteed.
+     * 4) If the entry's seq id and the position are in same section, or the pos is the last
+     *    number of previous section. Because when open a region we put a barrier the number
+     *    is the last log's id + 1.
+     * 5) Log's seq is smaller than pos in meta, we are retrying. It may happen when a RS crashes
+     *    after save replication meta and before save zk offset.
+     */
+    List<Long> barriers = MetaTableAccessor.getReplicationBarriers(connection, encodedName);
+    if (barriers.isEmpty() || seq <= barriers.get(0)) {
+      // Case 2
+      return;
+    }
+    int interval = Collections.binarySearch(barriers, seq);
+    if (interval < 0) {
+      interval = -interval - 1;// get the insert position if negative
+    }
+    if (interval == 1) {
+      // Case 3
+      return;
+    }
+
+    while (true) {
+      long pos = MetaTableAccessor.getReplicationPositionForOnePeer(connection, encodedName, peerId);
+      if (seq <= pos) {
+        // Case 5
+      }
+      if (pos >= 0) {
+        // Case 4
+        int posInterval = Collections.binarySearch(barriers, pos);
+        if (posInterval < 0) {
+          posInterval = -posInterval - 1;// get the insert position if negative
+        }
+        if (posInterval == interval || pos == barriers.get(interval - 1) - 1) {
+          return;
+        }
+      }
+
+      LOG.info(Bytes.toString(encodedName) + " can not start pushing to peer " + peerId
+          + " because previous log has not been pushed: sequence=" + seq + " pos=" + pos
+          + " barriers=" + Arrays.toString(barriers.toArray()));
+      Thread.sleep(replicationWaitTime);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
index 1c59a44..81dadd9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
@@ -144,6 +144,30 @@ public class FSTableDescriptors implements TableDescriptors {
                     // Enable cache of data blocks in L1 if more than one caching tier deployed:
                     // e.g. if using CombinedBlockCache (BucketCache).
                 .setCacheDataInL1(true),
+            new HColumnDescriptor(HConstants.REPLICATION_BARRIER_FAMILY)
+                .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
+                    HConstants.DEFAULT_HBASE_META_VERSIONS))
+                .setInMemory(true)
+                .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
+                    HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
+                .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
+                // Disable blooms for meta.  Needs work.  Seems to mess w/ getClosestOrBefore.
+                .setBloomFilterType(BloomType.NONE)
+                // Enable cache of data blocks in L1 if more than one caching tier deployed:
+                // e.g. if using CombinedBlockCache (BucketCache).
+                .setCacheDataInL1(true),
+            new HColumnDescriptor(HConstants.REPLICATION_POSITION_FAMILY)
+                .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
+                    HConstants.DEFAULT_HBASE_META_VERSIONS))
+                .setInMemory(true)
+                .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
+                    HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
+                .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
+                // Disable blooms for meta.  Needs work.  Seems to mess w/ getClosestOrBefore.
+                .setBloomFilterType(BloomType.NONE)
+                // Enable cache of data blocks in L1 if more than one caching tier deployed:
+                // e.g. if using CombinedBlockCache (BucketCache).
+                .setCacheDataInL1(true),
             new HColumnDescriptor(HConstants.TABLE_FAMILY)
                 // Ten is arbitrary number.  Keep versions to help debugging.
                 .setMaxVersions(10)

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index af63b0b..79321b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -22,8 +22,11 @@ package org.apache.hadoop.hbase.wal;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -35,6 +38,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
@@ -282,6 +286,18 @@ public interface WAL {
       key.setCompressionContext(compressionContext);
     }
 
+    public boolean hasSerialReplicationScope () {
+      if (getKey().getReplicationScopes() == null || getKey().getReplicationScopes().isEmpty()) {
+        return false;
+      }
+      for (Map.Entry<byte[], Integer> e:getKey().getReplicationScopes().entrySet()) {
+        if (e.getValue() == HConstants.REPLICATION_SCOPE_SERIAL){
+          return true;
+        }
+      }
+      return false;
+    }
+
     @Override
     public String toString() {
       return this.key + "=" + this.edit;

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
index 8b84452..d750faf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
@@ -452,7 +452,7 @@ public class TestMetaTableAccessor {
       List<HRegionInfo> regionInfos = Lists.newArrayList(parent);
       MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
 
-      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
+      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false);
 
       assertEmptyMetaLocation(meta, splitA.getRegionName(), 1);
       assertEmptyMetaLocation(meta, splitA.getRegionName(), 2);
@@ -481,7 +481,7 @@ public class TestMetaTableAccessor {
       MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
 
       MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3,
-          HConstants.LATEST_TIMESTAMP);
+          HConstants.LATEST_TIMESTAMP, false);
 
       assertEmptyMetaLocation(meta, merged.getRegionName(), 1);
       assertEmptyMetaLocation(meta, merged.getRegionName(), 2);
@@ -609,7 +609,7 @@ public class TestMetaTableAccessor {
 
       // now merge the regions, effectively deleting the rows for region a and b.
       MetaTableAccessor.mergeRegions(connection, mergedRegionInfo,
-        regionInfoA, regionInfoB, sn, 1, masterSystemTime);
+        regionInfoA, regionInfoB, sn, 1, masterSystemTime, false);
 
       result = meta.get(get);
       serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
@@ -692,7 +692,7 @@ public class TestMetaTableAccessor {
       }
       SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler();
       long prevCalls = scheduler.numPriorityCalls;
-      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1);
+      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB,loc.getServerName(),1,false);
 
       assertTrue(prevCalls < scheduler.numPriorityCalls);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
index 7d3d2e9..c15ccf4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
@@ -1211,7 +1211,7 @@ public class TestAssignmentManagerOnCluster {
   public void testUpdatesRemoteMeta() throws Exception {
     conf.setInt("hbase.regionstatestore.meta.connection", 3);
     final RegionStateStore rss =
-        new RegionStateStore(new MyRegionServer(conf, new ZkCoordinatedStateManager()));
+        new RegionStateStore(new MyMaster(conf, new ZkCoordinatedStateManager()));
     rss.start();
     // Create 10 threads and make each do 10 puts related to region state update
     Thread[] th = new Thread[10];