You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/02/23 06:54:29 UTC

[1/3] hbase git commit: Revert "HBASE-17010 Serial replication should handle daughter regions being assigned to another RS (Phil Yang)"

Repository: hbase
Updated Branches:
  refs/heads/branch-1 4d9589b16 -> ba7a936f7


Revert "HBASE-17010 Serial replication should handle daughter regions being assigned to another RS (Phil Yang)"

This reverts commit 97276da9a7cefc0472c487b60e4b6a03dc81610a.

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0a284d2b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0a284d2b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0a284d2b

Branch: refs/heads/branch-1
Commit: 0a284d2b76ceb775ff1f916a45633e74a261c5b8
Parents: 4d9589b
Author: Sean Busbey <bu...@apache.org>
Authored: Tue Nov 7 17:02:13 2017 -0600
Committer: zhangduo <zh...@apache.org>
Committed: Fri Feb 23 14:42:04 2018 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/HTableDescriptor.java   | 12 ----
 .../apache/hadoop/hbase/MetaTableAccessor.java  | 58 ++++++++++----------
 .../org/apache/hadoop/hbase/HConstants.java     | 11 +---
 .../master/cleaner/ReplicationMetaCleaner.java  |  1 -
 4 files changed, 32 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0a284d2b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index fbb9376..1fd950a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -1797,18 +1797,6 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
               // Enable cache of data blocks in L1 if more than one caching tier deployed:
               // e.g. if using CombinedBlockCache (BucketCache).
               .setCacheDataInL1(true),
-          new HColumnDescriptor(HConstants.REPLICATION_META_FAMILY)
-              .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
-                  HConstants.DEFAULT_HBASE_META_VERSIONS))
-              .setInMemory(true)
-              .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
-                  HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
-              .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
-              // Disable blooms for meta.  Needs work.  Seems to mess w/ getClosestOrBefore.
-              .setBloomFilterType(BloomType.NONE)
-              // Enable cache of data blocks in L1 if more than one caching tier deployed:
-              // e.g. if using CombinedBlockCache (BucketCache).
-              .setCacheDataInL1(true),
       });
     metaDescriptor.addCoprocessor(
       "org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint",

http://git-wip-us.apache.org/repos/asf/hbase/blob/0a284d2b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 2bc98be..c7e3757 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -110,14 +110,14 @@ public class MetaTableAccessor {
    * The actual layout of meta should be encapsulated inside MetaTableAccessor methods,
    * and should not leak out of it (through Result objects, etc)
    *
-   * For replication serially, there are three column families "rep_barrier", "rep_position" and
-   * "rep_meta" whose row key is encodedRegionName.
+   * For replication serially, there are two column families "rep_barrier", "rep_position" whose
+   * row key is encodedRegionName.
    * rep_barrier:{seqid}      => in each time a RS opens a region, it saves the open sequence
    *                                  id in this region
    * rep_position:{peerid}    => to save the max sequence id we have pushed for each peer
-   * rep_meta:_TABLENAME_     => a special cell to save this region's table name, will used when
+   * rep_position:_TABLENAME_ => a special cell to save this region's table name, will used when
    *                             we clean old data
-   * rep_meta:_DAUGHTER_      => a special cell to present this region is split or merged, in this
+   * rep_position:_DAUGHTER_  => a special cell to present this region is split or merged, in this
    *                             cell the value is merged encoded name or two split encoded names
    *                             separated by ","
    */
@@ -125,10 +125,10 @@ public class MetaTableAccessor {
   private static final Log LOG = LogFactory.getLog(MetaTableAccessor.class);
 
   // Save its daughter region(s) when split/merge
-  private static final byte[] daughterNameCq = Bytes.toBytes("_DAUGHTER_");
-
+  private static final byte[] daughterNamePosCq = Bytes.toBytes("_DAUGHTER_");
   // Save its table name because we only know region's encoded name
-  private static final byte[] tableNameCq = Bytes.toBytes("_TABLENAME_");
+  private static final String tableNamePeer = "_TABLENAME_";
+  private static final byte[] tableNamePosCq = Bytes.toBytes(tableNamePeer);
 
   static final byte [] META_REGION_PREFIX;
   static {
@@ -985,13 +985,13 @@ public class MetaTableAccessor {
     byte[] seqBytes = Bytes.toBytes(seq);
     return new Put(encodedRegionName)
         .addImmutable(HConstants.REPLICATION_BARRIER_FAMILY, seqBytes, seqBytes)
-        .addImmutable(HConstants.REPLICATION_META_FAMILY, tableNameCq, tableName);
+        .addImmutable(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq, tableName);
   }
 
 
-  public static Put makeDaughterPut(byte[] encodedRegionName, byte[] value) {
-    return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_META_FAMILY,
-        daughterNameCq, value);
+  public static Put makeSerialDaughterPut(byte[] encodedRegionName, byte[] value) {
+    return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_POSITION_FAMILY,
+        daughterNamePosCq, value);
   }
 
   /**
@@ -1016,7 +1016,7 @@ public class MetaTableAccessor {
    * @param puts Put to add to hbase:meta
    * @throws IOException
    */
-  public static void putToMetaTable(final Connection connection, final Put... puts) throws IOException {
+  static void putToMetaTable(final Connection connection, final Put... puts) throws IOException {
     put(getMetaHTable(connection), Arrays.asList(puts));
   }
 
@@ -1297,10 +1297,10 @@ public class MetaTableAccessor {
         + HConstants.DELIMITER);
       Mutation[] mutations;
       if (saveBarrier) {
-        Put putBarrierA = makeDaughterPut(regionA.getEncodedNameAsBytes(),
-            mergedRegion.getEncodedNameAsBytes());
-        Put putBarrierB = makeDaughterPut(regionB.getEncodedNameAsBytes(),
-            mergedRegion.getEncodedNameAsBytes());
+        Put putBarrierA = makeSerialDaughterPut(regionA.getEncodedNameAsBytes(),
+            Bytes.toBytes(mergedRegion.getEncodedName()));
+        Put putBarrierB = makeSerialDaughterPut(regionB.getEncodedNameAsBytes(),
+            Bytes.toBytes(mergedRegion.getEncodedName()));
         mutations = new Mutation[] { putOfMerged, deleteA, deleteB, putBarrierA, putBarrierB };
       } else {
         mutations = new Mutation[] { putOfMerged, deleteA, deleteB };
@@ -1352,9 +1352,10 @@ public class MetaTableAccessor {
 
       Mutation[] mutations;
       if (saveBarrier) {
-        Put parentPut = makeDaughterPut(parent.getEncodedNameAsBytes(),
-            Bytes.toBytes(splitA.getEncodedName() + "," + splitB.getEncodedName()));
-        mutations = new Mutation[]{putParent, putA, putB, parentPut };
+        Put putBarrier = makeSerialDaughterPut(parent.getEncodedNameAsBytes(),
+            Bytes
+                .toBytes(splitA.getEncodedName() + HConstants.DELIMITER + splitB.getEncodedName()));
+        mutations = new Mutation[]{putParent, putA, putB, putBarrier};
       } else {
         mutations = new Mutation[]{putParent, putA, putB};
       }
@@ -1651,9 +1652,14 @@ public class MetaTableAccessor {
     Result r = get(getMetaHTable(connection), get);
     Map<String, Long> map = new HashMap<>((int) (r.size() / 0.75 + 1));
     for (Cell c : r.listCells()) {
-      map.put(
-          Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()),
-          Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
+      if (!Bytes.equals(tableNamePosCq, 0, tableNamePosCq.length, c.getQualifierArray(),
+          c.getQualifierOffset(), c.getQualifierLength()) &&
+          !Bytes.equals(daughterNamePosCq, 0, daughterNamePosCq.length, c.getQualifierArray(),
+              c.getQualifierOffset(), c.getQualifierLength())) {
+        map.put(
+            Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()),
+            Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
+      }
     }
     return map;
   }
@@ -1700,14 +1706,12 @@ public class MetaTableAccessor {
 
   /**
    * Get daughter region(s) for a region, only used in serial replication.
-   * @param connection connection we're using
-   * @param encodedName region's encoded name
    * @throws IOException
    */
   public static String getSerialReplicationDaughterRegion(Connection connection, byte[] encodedName)
       throws IOException {
     Get get = new Get(encodedName);
-    get.addColumn(HConstants.REPLICATION_META_FAMILY, daughterNameCq);
+    get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, daughterNamePosCq);
     Result result = get(getMetaHTable(connection), get);
     if (!result.isEmpty()) {
       Cell c = result.rawCells()[0];
@@ -1718,14 +1722,12 @@ public class MetaTableAccessor {
 
   /**
    * Get the table name for a region, only used in serial replication.
-   * @param connection connection we're using
-   * @param encodedName region's encoded name
    * @throws IOException
    */
   public static String getSerialReplicationTableName(Connection connection, byte[] encodedName)
       throws IOException {
     Get get = new Get(encodedName);
-    get.addColumn(HConstants.REPLICATION_META_FAMILY, tableNameCq);
+    get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq);
     Result result = get(getMetaHTable(connection), get);
     if (!result.isEmpty()) {
       Cell c = result.rawCells()[0];

http://git-wip-us.apache.org/repos/asf/hbase/blob/0a284d2b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 63dade1..dc44c77 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -456,20 +456,13 @@ public final class HConstants {
   public static final byte [] REPLICATION_BARRIER_FAMILY =
       Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR);
 
-  /** The replication position family as a string*/
+  /** The replication barrier family as a string*/
   public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position";
 
-  /** The replication position family */
+  /** The replication barrier family */
   public static final byte [] REPLICATION_POSITION_FAMILY =
       Bytes.toBytes(REPLICATION_POSITION_FAMILY_STR);
 
-  /** The replication meta family as a string*/
-  public static final String REPLICATION_META_FAMILY_STR = "rep_meta";
-
-  /** The replication meta family */
-  public static final byte [] REPLICATION_META_FAMILY =
-      Bytes.toBytes(REPLICATION_META_FAMILY_STR);
-
   /** The RegionInfo qualifier as a string */
   public static final String REGIONINFO_QUALIFIER_STR = "regioninfo";
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/0a284d2b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
index 3e3abf3..41864b9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
@@ -141,7 +141,6 @@ public class ReplicationMetaCleaner extends ScheduledChore {
           Delete delete = new Delete(encodedBytes);
           delete.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
           delete.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
-          delete.addFamily(HConstants.REPLICATION_META_FAMILY);
           try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
             metaTable.delete(delete);
           }


[3/3] hbase git commit: Revert "HBASE-9465 Push entries to peer clusters serially"

Posted by zh...@apache.org.
Revert "HBASE-9465 Push entries to peer clusters serially"

This reverts commit 441bc050b991c14c048617bc443b97f46e21b76f.

 Conflicts:
	hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
	hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
	hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ba7a936f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ba7a936f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ba7a936f

Branch: refs/heads/branch-1
Commit: ba7a936f74985eb9d974fdc87b0d06cb8cd8473d
Parents: 0a284d2
Author: Sean Busbey <bu...@apache.org>
Authored: Tue Nov 7 23:50:35 2017 -0600
Committer: zhangduo <zh...@apache.org>
Committed: Fri Feb 23 14:42:15 2018 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/HTableDescriptor.java   |  46 +--
 .../apache/hadoop/hbase/MetaTableAccessor.java  | 243 ++---------
 .../client/replication/ReplicationAdmin.java    |  14 +-
 .../org/apache/hadoop/hbase/HConstants.java     |  26 --
 .../src/main/resources/hbase-default.xml        |  14 -
 .../hbase/protobuf/generated/WALProtos.java     |  16 +-
 hbase-protocol/src/main/protobuf/WAL.proto      |   1 -
 .../org/apache/hadoop/hbase/master/HMaster.java |   9 -
 .../hadoop/hbase/master/RegionStateStore.java   |  43 +-
 .../master/cleaner/ReplicationMetaCleaner.java  | 187 ---------
 .../RegionMergeTransactionImpl.java             |   3 +-
 .../hbase/regionserver/ReplicationService.java  |   1 -
 .../regionserver/SplitTransactionImpl.java      |   2 +-
 .../replication/regionserver/Replication.java   |  14 +-
 .../regionserver/ReplicationSource.java         |  68 +---
 .../regionserver/ReplicationSourceManager.java  |  87 +---
 .../ReplicationSourceWALReaderThread.java       |  31 --
 .../java/org/apache/hadoop/hbase/wal/WAL.java   |  13 -
 .../hadoop/hbase/TestMetaTableAccessor.java     |  10 +-
 .../hadoop/hbase/client/TestMetaScanner.java    |   2 +-
 .../master/TestAssignmentManagerOnCluster.java  |   2 +-
 .../replication/TestSerialReplication.java      | 401 -------------------
 .../regionserver/TestGlobalThrottler.java       |   2 +-
 23 files changed, 69 insertions(+), 1166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index 1fd950a..7f48976 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -34,12 +34,13 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.regex.Matcher;
 
+import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -51,7 +52,6 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.WritableComparable;
@@ -1217,18 +1217,6 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
   }
 
   /**
-   * Return true if there are at least one cf whose replication scope is serial.
-   */
-  public boolean hasSerialReplicationScope() {
-    for (HColumnDescriptor column: getFamilies()){
-      if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL){
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
    * Returns the configured replicas per region
    */
   public int getRegionReplication() {
@@ -1772,32 +1760,8 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
           .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
           // Disable blooms for meta.  Needs work.  Seems to mess w/ getClosestOrBefore.
           .setBloomFilterType(BloomType.NONE)
-            .setCacheDataInL1(true),
-          new HColumnDescriptor(HConstants.REPLICATION_BARRIER_FAMILY)
-              .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
-                  HConstants.DEFAULT_HBASE_META_VERSIONS))
-              .setInMemory(true)
-              .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
-                  HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
-              .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
-              // Disable blooms for meta.  Needs work.  Seems to mess w/ getClosestOrBefore.
-              .setBloomFilterType(BloomType.NONE)
-              // Enable cache of data blocks in L1 if more than one caching tier deployed:
-              // e.g. if using CombinedBlockCache (BucketCache).
-              .setCacheDataInL1(true),
-          new HColumnDescriptor(HConstants.REPLICATION_POSITION_FAMILY)
-              .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
-                  HConstants.DEFAULT_HBASE_META_VERSIONS))
-              .setInMemory(true)
-              .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
-                  HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
-              .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
-              // Disable blooms for meta.  Needs work.  Seems to mess w/ getClosestOrBefore.
-              .setBloomFilterType(BloomType.NONE)
-              // Enable cache of data blocks in L1 if more than one caching tier deployed:
-              // e.g. if using CombinedBlockCache (BucketCache).
-              .setCacheDataInL1(true),
-      });
+          .setCacheDataInL1(true)
+         });
     metaDescriptor.addCoprocessor(
       "org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint",
       null, Coprocessor.PRIORITY_SYSTEM, null);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index c7e3757..3f11558 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -20,20 +20,6 @@ package org.apache.hadoop.hbase;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ServiceException;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -63,6 +49,18 @@ import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 /**
  * Read/write operations on region and assignment information store in
  * <code>hbase:meta</code>.
@@ -109,27 +107,10 @@ public class MetaTableAccessor {
    *
    * The actual layout of meta should be encapsulated inside MetaTableAccessor methods,
    * and should not leak out of it (through Result objects, etc)
-   *
-   * For replication serially, there are two column families "rep_barrier", "rep_position" whose
-   * row key is encodedRegionName.
-   * rep_barrier:{seqid}      => in each time a RS opens a region, it saves the open sequence
-   *                                  id in this region
-   * rep_position:{peerid}    => to save the max sequence id we have pushed for each peer
-   * rep_position:_TABLENAME_ => a special cell to save this region's table name, will used when
-   *                             we clean old data
-   * rep_position:_DAUGHTER_  => a special cell to present this region is split or merged, in this
-   *                             cell the value is merged encoded name or two split encoded names
-   *                             separated by ","
    */
 
   private static final Log LOG = LogFactory.getLog(MetaTableAccessor.class);
 
-  // Save its daughter region(s) when split/merge
-  private static final byte[] daughterNamePosCq = Bytes.toBytes("_DAUGHTER_");
-  // Save its table name because we only know region's encoded name
-  private static final String tableNamePeer = "_TABLENAME_";
-  private static final byte[] tableNamePosCq = Bytes.toBytes(tableNamePeer);
-
   static final byte [] META_REGION_PREFIX;
   static {
     // Copy the prefix from FIRST_META_REGIONINFO into META_REGION_PREFIX.
@@ -981,19 +962,6 @@ public class MetaTableAccessor {
     return delete;
   }
 
-  public static Put makeBarrierPut(byte[] encodedRegionName, long seq, byte[] tableName) {
-    byte[] seqBytes = Bytes.toBytes(seq);
-    return new Put(encodedRegionName)
-        .addImmutable(HConstants.REPLICATION_BARRIER_FAMILY, seqBytes, seqBytes)
-        .addImmutable(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq, tableName);
-  }
-
-
-  public static Put makeSerialDaughterPut(byte[] encodedRegionName, byte[] value) {
-    return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_POSITION_FAMILY,
-        daughterNamePosCq, value);
-  }
-
   /**
    * Adds split daughters to the Put
    */
@@ -1010,24 +978,24 @@ public class MetaTableAccessor {
   }
 
   /**
-   * Put the passed <code>puts</code> to the <code>hbase:meta</code> table.
-   * Non-atomic for multi puts.
+   * Put the passed <code>p</code> to the <code>hbase:meta</code> table.
    * @param connection connection we're using
-   * @param puts Put to add to hbase:meta
+   * @param p Put to add to hbase:meta
    * @throws IOException
    */
-  static void putToMetaTable(final Connection connection, final Put... puts) throws IOException {
-    put(getMetaHTable(connection), Arrays.asList(puts));
+  static void putToMetaTable(final Connection connection, final Put p)
+    throws IOException {
+    put(getMetaHTable(connection), p);
   }
 
   /**
    * @param t Table to use (will be closed when done).
-   * @param puts puts to make
+   * @param p put to make
    * @throws IOException
    */
-  private static void put(final Table t, final List<Put> puts) throws IOException {
+  private static void put(final Table t, final Put p) throws IOException {
     try {
-      t.put(puts);
+      t.put(p);
     } finally {
       t.close();
     }
@@ -1153,7 +1121,7 @@ public class MetaTableAccessor {
    * Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
    * does not add its daughter's as different rows, but adds information about the daughters
    * in the same row as the parent. Use
-   * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)}
+   * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)}
    * if you want to do that.
    * @param meta the Table for META
    * @param regionInfo region information
@@ -1175,7 +1143,7 @@ public class MetaTableAccessor {
    * Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
    * does not add its daughter's as different rows, but adds information about the daughters
    * in the same row as the parent. Use
-   * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)}
+   * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)}
    * if you want to do that.
    * @param connection connection we're using
    * @param regionInfo region information
@@ -1264,7 +1232,7 @@ public class MetaTableAccessor {
    */
   public static void mergeRegions(final Connection connection, HRegionInfo mergedRegion,
       HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int regionReplication,
-      long masterSystemTime, boolean saveBarrier)
+      long masterSystemTime)
           throws IOException {
     Table meta = getMetaHTable(connection);
     try {
@@ -1295,17 +1263,7 @@ public class MetaTableAccessor {
 
       byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString()
         + HConstants.DELIMITER);
-      Mutation[] mutations;
-      if (saveBarrier) {
-        Put putBarrierA = makeSerialDaughterPut(regionA.getEncodedNameAsBytes(),
-            Bytes.toBytes(mergedRegion.getEncodedName()));
-        Put putBarrierB = makeSerialDaughterPut(regionB.getEncodedNameAsBytes(),
-            Bytes.toBytes(mergedRegion.getEncodedName()));
-        mutations = new Mutation[] { putOfMerged, deleteA, deleteB, putBarrierA, putBarrierB };
-      } else {
-        mutations = new Mutation[] { putOfMerged, deleteA, deleteB };
-      }
-      multiMutate(meta, tableRow, mutations);
+      multiMutate(meta, tableRow, putOfMerged, deleteA, deleteB);
     } finally {
       meta.close();
     }
@@ -1321,11 +1279,10 @@ public class MetaTableAccessor {
    * @param splitA Split daughter region A
    * @param splitB Split daughter region A
    * @param sn the location of the region
-   * @param saveBarrier true if need save replication barrier in meta, used for serial replication
    */
-  public static void splitRegion(final Connection connection, HRegionInfo parent,
-      HRegionInfo splitA, HRegionInfo splitB, ServerName sn, int regionReplication,
-      boolean saveBarrier) throws IOException {
+  public static void splitRegion(final Connection connection,
+                                 HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
+                                 ServerName sn, int regionReplication) throws IOException {
     Table meta = getMetaHTable(connection);
     try {
       HRegionInfo copyOfParent = new HRegionInfo(parent);
@@ -1350,17 +1307,8 @@ public class MetaTableAccessor {
         addEmptyLocation(putB, i);
       }
 
-      Mutation[] mutations;
-      if (saveBarrier) {
-        Put putBarrier = makeSerialDaughterPut(parent.getEncodedNameAsBytes(),
-            Bytes
-                .toBytes(splitA.getEncodedName() + HConstants.DELIMITER + splitB.getEncodedName()));
-        mutations = new Mutation[]{putParent, putA, putB, putBarrier};
-      } else {
-        mutations = new Mutation[]{putParent, putA, putB};
-      }
       byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER);
-      multiMutate(meta, tableRow, mutations);
+      multiMutate(meta, tableRow, putParent, putA, putB);
     } finally {
       meta.close();
     }
@@ -1418,27 +1366,6 @@ public class MetaTableAccessor {
   }
 
   /**
-   * Updates the progress of pushing entries to peer cluster. Skip entry if value is -1.
-   * @param connection connection we're using
-   * @param peerId the peerId to push
-   * @param positions map that saving positions for each region
-   * @throws IOException
-   */
-  public static void updateReplicationPositions(Connection connection, String peerId,
-      Map<String, Long> positions) throws IOException {
-    List<Put> puts = new ArrayList<>();
-    for (Map.Entry<String, Long> entry : positions.entrySet()) {
-      long value = Math.abs(entry.getValue());
-      Put put = new Put(Bytes.toBytes(entry.getKey()));
-      put.addImmutable(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId),
-          Bytes.toBytes(value));
-      puts.add(put);
-    }
-    getMetaHTable(connection).put(puts);
-  }
-
-
-  /**
    * Updates the location of the specified region to be the specified server.
    * <p>
    * Connects to the specified server which should be hosting the specified
@@ -1623,120 +1550,6 @@ public class MetaTableAccessor {
   }
 
   /**
-   * Get replication position for a peer in a region.
-   * @param connection connection we're using
-   * @return the position of this peer, -1 if no position in meta.
-   */
-  public static long getReplicationPositionForOnePeer(Connection connection,
-      byte[] encodedRegionName, String peerId) throws IOException {
-    Get get = new Get(encodedRegionName);
-    get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId));
-    Result r = get(getMetaHTable(connection), get);
-    if (r.isEmpty()) {
-      return -1;
-    }
-    Cell cell = r.rawCells()[0];
-    return Bytes.toLong(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
-  }
-
-  /**
-   * Get replication positions for all peers in a region.
-   * @param connection connection we're using
-   * @param encodedRegionName region's encoded name
-   * @return the map of positions for each peer
-   */
-  public static Map<String, Long> getReplicationPositionForAllPeer(Connection connection,
-      byte[] encodedRegionName) throws IOException {
-    Get get = new Get(encodedRegionName);
-    get.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
-    Result r = get(getMetaHTable(connection), get);
-    Map<String, Long> map = new HashMap<>((int) (r.size() / 0.75 + 1));
-    for (Cell c : r.listCells()) {
-      if (!Bytes.equals(tableNamePosCq, 0, tableNamePosCq.length, c.getQualifierArray(),
-          c.getQualifierOffset(), c.getQualifierLength()) &&
-          !Bytes.equals(daughterNamePosCq, 0, daughterNamePosCq.length, c.getQualifierArray(),
-              c.getQualifierOffset(), c.getQualifierLength())) {
-        map.put(
-            Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()),
-            Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
-      }
-    }
-    return map;
-  }
-
-  /**
-   * Get all barriers in all regions.
-   * @return a map of barrier lists in all regions
-   * @throws IOException
-   */
-  public static List<Long> getReplicationBarriers(Connection connection, byte[] encodedRegionName)
-      throws IOException {
-    Get get = new Get(encodedRegionName);
-    get.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
-    Result r = get(getMetaHTable(connection), get);
-    List<Long> list = new ArrayList<>();
-    if (!r.isEmpty()) {
-      for (Cell cell : r.rawCells()) {
-        list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
-            cell.getQualifierLength()));
-      }
-    }
-    return list;
-  }
-
-  public static Map<String, List<Long>> getAllBarriers(Connection connection) throws IOException {
-    Map<String, List<Long>> map = new HashMap<>();
-    Scan scan = new Scan();
-    scan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
-    try (Table t = getMetaHTable(connection);
-        ResultScanner scanner = t.getScanner(scan)) {
-      Result result;
-      while ((result = scanner.next()) != null) {
-        String key = Bytes.toString(result.getRow());
-        List<Long> list = new ArrayList<>();
-        for (Cell cell : result.rawCells()) {
-          list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
-              cell.getQualifierLength()));
-        }
-        map.put(key, list);
-      }
-    }
-    return map;
-  }
-
-  /**
-   * Get daughter region(s) for a region, only used in serial replication.
-   * @throws IOException
-   */
-  public static String getSerialReplicationDaughterRegion(Connection connection, byte[] encodedName)
-      throws IOException {
-    Get get = new Get(encodedName);
-    get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, daughterNamePosCq);
-    Result result = get(getMetaHTable(connection), get);
-    if (!result.isEmpty()) {
-      Cell c = result.rawCells()[0];
-      return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength());
-    }
-    return null;
-  }
-
-  /**
-   * Get the table name for a region, only used in serial replication.
-   * @throws IOException
-   */
-  public static String getSerialReplicationTableName(Connection connection, byte[] encodedName)
-      throws IOException {
-    Get get = new Get(encodedName);
-    get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq);
-    Result result = get(getMetaHTable(connection), get);
-    if (!result.isEmpty()) {
-      Cell c = result.rawCells()[0];
-      return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength());
-    }
-    return null;
-  }
-
-  /**
    * Checks whether hbase:meta contains any info:server entry.
    * @param connection connection we're using
    * @return true if hbase:meta contains any info:server entry, false if not

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 7bef9ed..55653d5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -91,10 +91,8 @@ public class ReplicationAdmin implements Closeable {
   // only Global for now, can add other type
   // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
   public static final String REPLICATIONTYPE = "replicationType";
-  public static final String REPLICATIONGLOBAL =
-      Integer.toString(HConstants.REPLICATION_SCOPE_GLOBAL);
-  public static final String REPLICATIONSERIAL =
-      Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL);
+  public static final String REPLICATIONGLOBAL = Integer
+      .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
 
   private final Connection connection;
   // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
@@ -488,10 +486,7 @@ public class ReplicationAdmin implements Closeable {
           HashMap<String, String> replicationEntry = new HashMap<String, String>();
           replicationEntry.put(TNAME, tableName);
           replicationEntry.put(CFNAME, column.getNameAsString());
-          replicationEntry.put(REPLICATIONTYPE,
-              column.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL ?
-                  REPLICATIONGLOBAL :
-                  REPLICATIONSERIAL);
+          replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
           replicationColFams.add(replicationEntry);
         }
       }
@@ -703,8 +698,7 @@ public class ReplicationAdmin implements Closeable {
     boolean hasDisabled = false;
 
     for (HColumnDescriptor hcd : htd.getFamilies()) {
-      if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
-          && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
+      if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
         hasDisabled = true;
       } else {
         hasEnabled = true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index dc44c77..e702236 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -449,20 +449,6 @@ public final class HConstants {
   /** The catalog family */
   public static final byte [] CATALOG_FAMILY = Bytes.toBytes(CATALOG_FAMILY_STR);
 
-  /** The replication barrier family as a string*/
-  public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier";
-
-  /** The replication barrier family */
-  public static final byte [] REPLICATION_BARRIER_FAMILY =
-      Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR);
-
-  /** The replication barrier family as a string*/
-  public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position";
-
-  /** The replication barrier family */
-  public static final byte [] REPLICATION_POSITION_FAMILY =
-      Bytes.toBytes(REPLICATION_POSITION_FAMILY_STR);
-
   /** The RegionInfo qualifier as a string */
   public static final String REGIONINFO_QUALIFIER_STR = "regioninfo";
 
@@ -658,12 +644,6 @@ public final class HConstants {
   public static final int REPLICATION_SCOPE_GLOBAL = 1;
 
   /**
-   * Scope tag for serially scoped data
-   * This data will be replicated to all peers by the order of sequence id.
-   */
-  public static final int REPLICATION_SCOPE_SERIAL = 2;
-
-  /**
    * Default cluster ID, cannot be used to identify a cluster so a key with
    * this value means it wasn't meant for replication.
    */
@@ -914,12 +894,6 @@ public final class HConstants {
   public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
   /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
   public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id";
-
-  public static final String
-      REPLICATION_SERIALLY_WAITING_KEY = "hbase.serial.replication.waitingMs";
-  public static final long
-      REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
-
   /**
    * Max total size of buffered entries in all replication peers. It will prevent server getting
    * OOM if there are many peers. Default value is 256MB which is four times to default

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 5908359..e1ae0ef 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1542,20 +1542,6 @@ possible configurations would overwhelm and obscure the important.
         slave clusters. The default of 10 will rarely need to be changed.
     </description>
   </property>
-  <property>
-    <name>hbase.serial.replication.waitingMs</name>
-    <value>10000</value>
-    <description>
-      By default, in replication we can not make sure the order of operations in slave cluster is
-      same as the order in master. If set REPLICATION_SCOPE to 2, we will push edits by the order
-      of written. This configure is to set how long (in ms) we will wait before next checking if a
-      log can not push right now because there are some logs written before it have not been pushed.
-      A larger waiting will decrease the number of queries on hbase:meta but will enlarge the delay
-      of replication. This feature relies on zk-less assignment, and conflicts with distributed log
-      replay. So users must set hbase.assignment.usezk and hbase.master.distributed.log.replay to
-      false to support it.
-    </description>
-  </property>
   <!-- Static Web User Filter properties. -->
   <property>
     <description>

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index a466e6c..e0efab4 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -21,10 +21,6 @@ public final class WALProtos {
      * <code>REPLICATION_SCOPE_GLOBAL = 1;</code>
      */
     REPLICATION_SCOPE_GLOBAL(1, 1),
-    /**
-     * <code>REPLICATION_SCOPE_SERIAL = 2;</code>
-     */
-    REPLICATION_SCOPE_SERIAL(2, 2),
     ;
 
     /**
@@ -35,10 +31,6 @@ public final class WALProtos {
      * <code>REPLICATION_SCOPE_GLOBAL = 1;</code>
      */
     public static final int REPLICATION_SCOPE_GLOBAL_VALUE = 1;
-    /**
-     * <code>REPLICATION_SCOPE_SERIAL = 2;</code>
-     */
-    public static final int REPLICATION_SCOPE_SERIAL_VALUE = 2;
 
 
     public final int getNumber() { return value; }
@@ -47,7 +39,6 @@ public final class WALProtos {
       switch (value) {
         case 0: return REPLICATION_SCOPE_LOCAL;
         case 1: return REPLICATION_SCOPE_GLOBAL;
-        case 2: return REPLICATION_SCOPE_SERIAL;
         default: return null;
       }
     }
@@ -12023,11 +12014,10 @@ public final class WALProtos {
       "e.pb.StoreDescriptor\022$\n\006server\030\006 \001(\0132\024.h" +
       "base.pb.ServerName\022\023\n\013region_name\030\007 \001(\014\"" +
       ".\n\tEventType\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_" +
-      "CLOSE\020\001\"\014\n\nWALTrailer*d\n\tScopeType\022\033\n\027RE" +
+      "CLOSE\020\001\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027RE" +
       "PLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_S" +
-      "COPE_GLOBAL\020\001\022\034\n\030REPLICATION_SCOPE_SERIA" +
-      "L\020\002B?\n*org.apache.hadoop.hbase.protobuf." +
-      "generatedB\tWALProtosH\001\210\001\000\240\001\001"
+      "COPE_GLOBAL\020\001B?\n*org.apache.hadoop.hbase" +
+      ".protobuf.generatedB\tWALProtosH\001\210\001\000\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-protocol/src/main/protobuf/WAL.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto
index 08925f8..a888686 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -77,7 +77,6 @@ message WALKey {
 enum ScopeType {
   REPLICATION_SCOPE_LOCAL = 0;
   REPLICATION_SCOPE_GLOBAL = 1;
-  REPLICATION_SCOPE_SERIAL = 2;
 }
 
 message FamilyScope {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index f8bbc65..6951098 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -99,7 +99,6 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
-import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
 import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
 import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
 import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore;
@@ -331,7 +330,6 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
   CatalogJanitor catalogJanitorChore;
   private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
   private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore;
-  private ReplicationMetaCleaner replicationMetaCleaner;
   private LogCleaner logCleaner;
   private HFileCleaner hfileCleaner;
 
@@ -1250,12 +1248,6 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     } catch (Exception e) {
       LOG.error("start replicationZKNodeCleanerChore failed", e);
     }
-    try {
-      replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
-      getChoreService().scheduleChore(replicationMetaCleaner);
-    } catch (Exception e) {
-      LOG.error("start ReplicationMetaCleaner failed", e);
-    }
   }
 
   @Override
@@ -1291,7 +1283,6 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
     if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true);
     if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true);
-    if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
     if (this.quotaManager != null) this.quotaManager.stop();
     if (this.activeMasterManager != null) this.activeMasterManager.stop();
     if (this.serverManager != null) this.serverManager.stop();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
index 2d445e2..476b4d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
@@ -17,15 +17,11 @@
  */
 package org.apache.hadoop.hbase.master;
 
-import com.google.common.base.Preconditions;
-
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -49,6 +45,8 @@ import org.apache.hadoop.hbase.util.MultiHConnection;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.common.base.Preconditions;
+
 /**
  * A helper to persist region state in meta. We may change this class
  * to StateStore later if we also use it to store other states in meta
@@ -65,7 +63,7 @@ public class RegionStateStore {
   private volatile boolean initialized;
 
   private final boolean noPersistence;
-  private final MasterServices server;
+  private final Server server;
 
   /**
    * Returns the {@link ServerName} from catalog table {@link Result}
@@ -135,7 +133,7 @@ public class RegionStateStore {
           State.SPLITTING_NEW, State.MERGED));
   }
 
-  RegionStateStore(final MasterServices server) {
+  RegionStateStore(final Server server) {
     Configuration conf = server.getConfiguration();
     // No need to persist if using ZK but not migrating
     noPersistence = ConfigUtil.useZKForAssignment(conf)
@@ -200,41 +198,31 @@ public class RegionStateStore {
     State state = newState.getState();
 
       int replicaId = hri.getReplicaId();
-      Put metaPut = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
+      Put put = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
       StringBuilder info = new StringBuilder("Updating hbase:meta row ");
       info.append(hri.getRegionNameAsString()).append(" with state=").append(state);
       if (serverName != null && !serverName.equals(oldServer)) {
-        metaPut.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
+        put.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
           Bytes.toBytes(serverName.getServerName()));
         info.append(", sn=").append(serverName);
       }
       if (openSeqNum >= 0) {
         Preconditions.checkArgument(state == State.OPEN
           && serverName != null, "Open region should be on a server");
-        MetaTableAccessor.addLocation(metaPut, serverName, openSeqNum, -1, replicaId);
+        MetaTableAccessor.addLocation(put, serverName, openSeqNum, -1, replicaId);
         info.append(", openSeqNum=").append(openSeqNum);
         info.append(", server=").append(serverName);
       }
-      metaPut.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
+      put.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
         Bytes.toBytes(state.name()));
       LOG.info(info);
-      HTableDescriptor descriptor = server.getTableDescriptors().get(hri.getTable());
-      boolean serial = false;
-      if (descriptor != null) {
-        serial = server.getTableDescriptors().get(hri.getTable()).hasSerialReplicationScope();
-      }
-      boolean shouldPutBarrier = serial && state == State.OPEN;
+
       // Persist the state change to meta
       if (metaRegion != null) {
         try {
           // Assume meta is pinned to master.
           // At least, that's what we want.
-          metaRegion.put(metaPut);
-          if (shouldPutBarrier) {
-            Put barrierPut = MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
-                openSeqNum, hri.getTable().getName());
-            metaRegion.put(barrierPut);
-          }
+          metaRegion.put(put);
           return; // Done here
         } catch (Throwable t) {
           // In unit tests, meta could be moved away by intention
@@ -253,10 +241,7 @@ public class RegionStateStore {
         }
       }
       // Called when meta is not on master
-      List<Put> list = shouldPutBarrier ?
-          Arrays.asList(metaPut, MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
-              openSeqNum, hri.getTable().getName())) : Arrays.asList(metaPut);
-      multiHConnection.processBatchCallback(list, TableName.META_TABLE_NAME, null, null);
+      multiHConnection.processBatchCallback(Arrays.asList(put), TableName.META_TABLE_NAME, null, null);
 
     } catch (IOException ioe) {
       LOG.error("Failed to persist region state " + newState, ioe);
@@ -266,14 +251,12 @@ public class RegionStateStore {
 
   void splitRegion(HRegionInfo p,
       HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
-    MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication,
-        server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
+    MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication);
   }
 
   void mergeRegions(HRegionInfo p,
       HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
     MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication,
-        EnvironmentEdgeManager.currentTime(),
-        server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
+    		EnvironmentEdgeManager.currentTime());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
deleted file mode 100644
index 41864b9..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.cleaner;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.ScheduledChore;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * This chore is to clean up the useless data in hbase:meta which is used by serial replication.
- */
-@InterfaceAudience.Private
-public class ReplicationMetaCleaner extends ScheduledChore {
-
-  private static final Log LOG = LogFactory.getLog(ReplicationMetaCleaner.class);
-
-  private ReplicationAdmin replicationAdmin;
-  private MasterServices master;
-
-  public ReplicationMetaCleaner(MasterServices master, Stoppable stoppable, int period)
-      throws IOException {
-    super("ReplicationMetaCleaner", stoppable, period);
-    this.master = master;
-    replicationAdmin = new ReplicationAdmin(master.getConfiguration());
-  }
-
-  @Override
-  protected void chore() {
-    try {
-      Map<String, HTableDescriptor> tables = master.getTableDescriptors().getAll();
-      Map<String, Set<String>> serialTables = new HashMap<>();
-      for (Map.Entry<String, HTableDescriptor> entry : tables.entrySet()) {
-        boolean hasSerialScope = false;
-        for (HColumnDescriptor column : entry.getValue().getFamilies()) {
-          if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL) {
-            hasSerialScope = true;
-            break;
-          }
-        }
-        if (hasSerialScope) {
-          serialTables.put(entry.getValue().getTableName().getNameAsString(), new HashSet<String>());
-        }
-      }
-      if (serialTables.isEmpty()){
-        return;
-      }
-
-      Map<String, ReplicationPeerConfig> peers = replicationAdmin.listPeerConfigs();
-      for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
-        for (Map.Entry<byte[], byte[]> map : entry.getValue().getPeerData()
-            .entrySet()) {
-          String tableName = Bytes.toString(map.getKey());
-          if (serialTables.containsKey(tableName)) {
-            serialTables.get(tableName).add(entry.getKey());
-            break;
-          }
-        }
-      }
-
-      Map<String, List<Long>> barrierMap = MetaTableAccessor.getAllBarriers(master.getConnection());
-      for (Map.Entry<String, List<Long>> entry : barrierMap.entrySet()) {
-        String encodedName = entry.getKey();
-        byte[] encodedBytes = Bytes.toBytes(encodedName);
-        boolean canClearRegion = false;
-        Map<String, Long> posMap = MetaTableAccessor.getReplicationPositionForAllPeer(
-            master.getConnection(), encodedBytes);
-        if (posMap.isEmpty()) {
-          continue;
-        }
-
-        String tableName = MetaTableAccessor.getSerialReplicationTableName(
-            master.getConnection(), encodedBytes);
-        Set<String> confPeers = serialTables.get(tableName);
-        if (confPeers == null) {
-          // This table doesn't exist or all cf's scope is not serial any more, we can clear meta.
-          canClearRegion = true;
-        } else {
-          if (!allPeersHavePosition(confPeers, posMap)) {
-            continue;
-          }
-
-          String daughterValue = MetaTableAccessor
-              .getSerialReplicationDaughterRegion(master.getConnection(), encodedBytes);
-          if (daughterValue != null) {
-            //this region is merged or split
-            boolean allDaughterStart = true;
-            String[] daughterRegions = daughterValue.split(",");
-            for (String daughter : daughterRegions) {
-              byte[] region = Bytes.toBytes(daughter);
-              if (!MetaTableAccessor.getReplicationBarriers(
-                  master.getConnection(), region).isEmpty() &&
-                  !allPeersHavePosition(confPeers,
-                      MetaTableAccessor
-                          .getReplicationPositionForAllPeer(master.getConnection(), region))) {
-                allDaughterStart = false;
-                break;
-              }
-            }
-            if (allDaughterStart) {
-              canClearRegion = true;
-            }
-          }
-        }
-        if (canClearRegion) {
-          Delete delete = new Delete(encodedBytes);
-          delete.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
-          delete.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
-          try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
-            metaTable.delete(delete);
-          }
-        } else {
-
-          // Barriers whose seq is larger than min pos of all peers, and the last barrier whose seq
-          // is smaller than min pos should be kept. All other barriers can be deleted.
-
-          long minPos = Long.MAX_VALUE;
-          for (Map.Entry<String, Long> pos : posMap.entrySet()) {
-            minPos = Math.min(minPos, pos.getValue());
-          }
-          List<Long> barriers = entry.getValue();
-          int index = Collections.binarySearch(barriers, minPos);
-          if (index < 0) {
-            index = -index - 1;
-          }
-          Delete delete = new Delete(encodedBytes);
-          for (int i = 0; i < index - 1; i++) {
-            delete.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, Bytes.toBytes(barriers.get(i)));
-          }
-          try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
-            metaTable.delete(delete);
-          }
-        }
-
-      }
-
-    } catch (IOException e) {
-      LOG.error("Exception during cleaning up.", e);
-    }
-
-  }
-
-  private boolean allPeersHavePosition(Set<String> peers, Map<String, Long> posMap)
-      throws IOException {
-    for(String peer:peers){
-      if (!posMap.containsKey(peer)){
-        return false;
-      }
-    }
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
index 9e31fb0..03aa059 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
@@ -434,8 +434,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
       if (metaEntries.isEmpty()) {
         MetaTableAccessor.mergeRegions(server.getConnection(),
           mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(),
-          server.getServerName(), region_a.getTableDesc().getRegionReplication(), masterSystemTime,
-            false);
+          server.getServerName(), region_a.getTableDesc().getRegionReplication(), masterSystemTime);
       } else {
         mergeRegionsAndPutMetaEntries(server.getConnection(),
           mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index 95da92a..25a27a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
index a3eea6d..f9a5d31 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
@@ -338,7 +338,7 @@ public class SplitTransactionImpl implements SplitTransaction {
         MetaTableAccessor.splitRegion(server.getConnection(),
           parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
           daughterRegions.getSecond().getRegionInfo(), server.getServerName(),
-          parent.getTableDesc().getRegionReplication(), false);
+          parent.getTableDesc().getRegionReplication());
       } else {
         offlineParentInMetaAndputMetaEntries(server.getConnection(),
           parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index b2b403b..d6f48b9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
@@ -296,19 +295,8 @@ public class Replication extends WALActionsListener.Base implements
         if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
           scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell);
         } else {
-          WALProtos.RegionEventDescriptor maybeEvent = WALEdit.getRegionEventDescriptor(cell);
-          if (maybeEvent != null && (maybeEvent.getEventType() ==
-              WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
-            // In serially replication, we use scopes when reading close marker.
-            for (HColumnDescriptor cf : families) {
-              if (cf.getScope() != REPLICATION_SCOPE_LOCAL) {
-                scopes.put(cf.getName(), cf.getScope());
-              }
-            }
-          }
-          // Skip the flush/compaction
+          // Skip the flush/compaction/region events
           continue;
-
         }
       } else if (hasReplication) {
         byte[] family = CellUtil.cloneFamily(cell);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 10f2e7b..add1043 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -18,9 +18,6 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Service;
@@ -49,7 +46,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -76,6 +72,10 @@ import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+
 /**
  * Class that handles the source of a replication stream.
  * Currently does not handle more than 1 slave
@@ -105,8 +105,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   private ReplicationQueueInfo replicationQueueInfo;
   // id of the peer cluster this source replicates to
   private String peerId;
-
-  String actualPeerId;
   // The manager of all sources to which we ping back our progress
   private ReplicationSourceManager manager;
   // Should we stop everything?
@@ -191,8 +189,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
     // ReplicationQueueInfo parses the peerId out of the znode for us
     this.peerId = this.replicationQueueInfo.getPeerId();
-    ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
-    this.actualPeerId = replicationQueueInfo.getPeerId();
     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
     this.replicationEndpoint = replicationEndpoint;
 
@@ -523,16 +519,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     // Current state of the worker thread
     private WorkerState state;
     ReplicationSourceWALReaderThread entryReader;
-    // Use guava cache to set ttl for each key
-    private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
-        .expireAfterAccess(1, TimeUnit.DAYS).build(
-            new CacheLoader<String, Boolean>() {
-              @Override
-              public Boolean load(String key) throws Exception {
-                return false;
-              }
-            }
-        );
 
     public ReplicationSourceShipperThread(String walGroupId,
         PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
@@ -568,9 +554,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
 
         try {
           WALEntryBatch entryBatch = entryReader.take();
-          for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
-            waitingUntilCanPush(entry);
-          }
           shipEdits(entryBatch);
           releaseBufferQuota((int) entryBatch.getHeapSize());
           if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty()
@@ -611,33 +594,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
       }
     }
 
-    private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
-      String key = entry.getKey();
-      long seq = entry.getValue();
-      boolean deleteKey = false;
-      if (seq <= 0) {
-        // There is a REGION_CLOSE marker, we can not continue skipping after this entry.
-        deleteKey = true;
-        seq = -seq;
-      }
-
-      if (!canSkipWaitingSet.getUnchecked(key)) {
-        try {
-          manager.waitUntilCanBePushed(Bytes.toBytes(key), seq, actualPeerId);
-        } catch (IOException e) {
-          LOG.error("waitUntilCanBePushed fail", e);
-          stopper.stop("waitUntilCanBePushed fail");
-        } catch (InterruptedException e) {
-          LOG.warn("waitUntilCanBePushed interrupted", e);
-          Thread.currentThread().interrupt();
-        }
-        canSkipWaitingSet.put(key, true);
-      }
-      if (deleteKey) {
-        canSkipWaitingSet.invalidate(key);
-      }
-    }
-
     private void cleanUpHFileRefs(WALEdit edit) throws IOException {
       String peerId = peerClusterZnode;
       if (peerId.contains("-")) {
@@ -682,8 +638,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
       int sleepMultiplier = 0;
       if (entries.isEmpty()) {
         if (lastLoggedPosition != lastReadPosition) {
-          // Save positions to meta table before zk.
-          updateSerialRepPositions(entryBatch.getLastSeqIds());
           updateLogPosition(lastReadPosition);
           // if there was nothing to ship and it's not an error
           // set "ageOfLastShippedOp" to <now> to indicate that we're current
@@ -738,10 +692,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
             for (int i = 0; i < size; i++) {
               cleanUpHFileRefs(entries.get(i).getEdit());
             }
-
-            // Save positions to meta table before zk.
-            updateSerialRepPositions(entryBatch.getLastSeqIds());
-
             //Log and clean up WAL logs
             updateLogPosition(lastReadPosition);
           }
@@ -770,16 +720,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
       }
     }
 
-    private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
-      try {
-        MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
-          lastPositionsForSerialScope);
-      } catch (IOException e) {
-        LOG.error("updateReplicationPositions fail", e);
-        stopper.stop("updateReplicationPositions fail");
-      }
-    }
-
     private void updateLogPosition(long lastReadPosition) {
       manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition,
         this.replicationQueueInfo.isQueueRecovered(), false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 6ec30de..63bba8d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -24,7 +24,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -50,13 +49,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@@ -70,7 +66,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 
 /**
@@ -125,8 +120,6 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final Random rand;
   private final boolean replicationForBulkLoadDataEnabled;
 
-  private Connection connection;
-  private long replicationWaitTime;
 
   private AtomicLong totalBufferUsed = new AtomicLong();
 
@@ -145,7 +138,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
       final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
-      final Path oldLogDir, final UUID clusterId) throws IOException {
+      final Path oldLogDir, final UUID clusterId) {
     //CopyOnWriteArrayList is thread-safe.
     //Generally, reading is more than modifying.
     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
@@ -182,9 +175,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     replicationForBulkLoadDataEnabled =
         conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
           HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
-    this.replicationWaitTime = conf.getLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY,
-          HConstants.REPLICATION_SERIALLY_WAITING_DEFAULT);
-    connection = ConnectionFactory.createConnection(conf);
   }
 
   /**
@@ -830,10 +820,6 @@ public class ReplicationSourceManager implements ReplicationListener {
    */
   public ReplicationPeers getReplicationPeers() {return this.replicationPeers;}
 
-  public Connection getConnection() {
-    return this.connection;
-  }
-
   /**
    * Get a string representation of all the sources' metrics
    */
@@ -860,75 +846,4 @@ public class ReplicationSourceManager implements ReplicationListener {
   public void cleanUpHFileRefs(String peerId, List<String> files) {
     this.replicationQueues.removeHFileRefs(peerId, files);
   }
-
-  /**
-   * Whether an entry can be pushed to the peer or not right now.
-   * If we enable serial replication, we can not push the entry until all entries in its region
-   * whose sequence numbers are smaller than this entry have been pushed.
-   * For each ReplicationSource, we need only check the first entry in each region, as long as it
-   * can be pushed, we can push all in this ReplicationSource.
-   * This method will be blocked until we can push.
-   * @return the first barrier of entry's region, or -1 if there is no barrier. It is used to
-   *         prevent saving positions in the region of no barrier.
-   */
-  void waitUntilCanBePushed(byte[] encodedName, long seq, String peerId)
-      throws IOException, InterruptedException {
-
-    /**
-     * There are barriers for this region and position for this peer. N barriers form N intervals,
-     * (b1,b2) (b2,b3) ... (bn,max). Generally, there is no logs whose seq id is not greater than
-     * the first barrier and the last interval is start from the last barrier.
-     *
-     * There are several conditions that we can push now, otherwise we should block:
-     * 1) "Serial replication" is not enabled, we can push all logs just like before. This case
-     *    should not call this method.
-     * 2) There is no barriers for this region, or the seq id is smaller than the first barrier.
-     *    It is mainly because we alter REPLICATION_SCOPE = 2. We can not guarantee the
-     *    order of logs that is written before altering.
-     * 3) This entry is in the first interval of barriers. We can push them because it is the
-     *    start of a region. Splitting/merging regions are also ok because the first section of
-     *    daughter region is in same region of parents and the order in one RS is guaranteed.
-     * 4) If the entry's seq id and the position are in same section, or the pos is the last
-     *    number of previous section. Because when open a region we put a barrier the number
-     *    is the last log's id + 1.
-     * 5) Log's seq is smaller than pos in meta, we are retrying. It may happen when a RS crashes
-     *    after save replication meta and before save zk offset.
-     */
-    List<Long> barriers = MetaTableAccessor.getReplicationBarriers(connection, encodedName);
-    if (barriers.isEmpty() || seq <= barriers.get(0)) {
-      // Case 2
-      return;
-    }
-    int interval = Collections.binarySearch(barriers, seq);
-    if (interval < 0) {
-      interval = -interval - 1;// get the insert position if negative
-    }
-    if (interval == 1) {
-      // Case 3
-      return;
-    }
-
-    while (true) {
-      long pos = MetaTableAccessor.getReplicationPositionForOnePeer(connection, encodedName, peerId);
-      if (seq <= pos) {
-        // Case 5
-      }
-      if (pos >= 0) {
-        // Case 4
-        int posInterval = Collections.binarySearch(barriers, pos);
-        if (posInterval < 0) {
-          posInterval = -posInterval - 1;// get the insert position if negative
-        }
-        if (posInterval == interval || pos == barriers.get(interval - 1) - 1) {
-          return;
-        }
-      }
-
-      LOG.info(Bytes.toString(encodedName) + " can not start pushing to peer " + peerId
-          + " because previous log has not been pushed: sequence=" + seq + " pos=" + pos
-          + " barriers=" + Arrays.toString(barriers.toArray()));
-      Thread.sleep(replicationWaitTime);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index 40828b7..306ba8f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -141,10 +141,6 @@ public class ReplicationSourceWALReaderThread extends Thread {
               batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
             }
             Entry entry = entryStream.next();
-            if (updateSerialReplPos(batch, entry)) {
-              batch.lastWalPosition = entryStream.getPosition();
-              break;
-            }
             entry = filterEntry(entry);
             if (entry != null) {
               WALEdit edit = entry.getEdit();
@@ -246,33 +242,6 @@ public class ReplicationSourceWALReaderThread extends Thread {
   }
 
   /**
-   * @return true if we should stop reading because we're at REGION_CLOSE
-   */
-  private boolean updateSerialReplPos(WALEntryBatch batch, Entry entry) throws IOException {
-    if (entry.hasSerialReplicationScope()) {
-      String key = Bytes.toString(entry.getKey().getEncodedRegionName());
-      batch.setLastPosition(key, entry.getKey().getSequenceId());
-      if (!entry.getEdit().getCells().isEmpty()) {
-        WALProtos.RegionEventDescriptor maybeEvent =
-            WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
-        if (maybeEvent != null && maybeEvent
-            .getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
-          // In serially replication, if we move a region to another RS and move it back, we may
-          // read logs crossing two sections. We should break at REGION_CLOSE and push the first
-          // section first in case of missing the middle section belonging to the other RS.
-          // In a worker thread, if we can push the first log of a region, we can push all logs
-          // in the same region without waiting until we read a close marker because next time
-          // we read logs in this region, it must be a new section and not adjacent with this
-          // region. Mark it negative.
-          batch.setLastPosition(key, -entry.getKey().getSequenceId());
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  /**
    * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
    * batch to become available
    * @return A batch of entries, along with the position in the log after reading the batch

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 2e34b64..cc2f42a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.wal;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.HConstants;
@@ -271,18 +270,6 @@ public interface WAL extends Closeable {
       key.setCompressionContext(compressionContext);
     }
 
-    public boolean hasSerialReplicationScope () {
-      if (getKey().getScopes() == null || getKey().getScopes().isEmpty()) {
-        return false;
-      }
-      for (Map.Entry<byte[], Integer> e:getKey().getScopes().entrySet()) {
-        if (e.getValue() == HConstants.REPLICATION_SCOPE_SERIAL){
-          return true;
-        }
-      }
-      return false;
-    }
-
     @Override
     public String toString() {
       return this.key + "=" + this.edit;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
index 98fff27..cb2494b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
@@ -443,7 +443,7 @@ public class TestMetaTableAccessor {
       List<HRegionInfo> regionInfos = Lists.newArrayList(parent);
       MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
 
-      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false);
+      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
 
       assertEmptyMetaLocation(meta, splitA.getRegionName(), 1);
       assertEmptyMetaLocation(meta, splitA.getRegionName(), 2);
@@ -472,7 +472,7 @@ public class TestMetaTableAccessor {
       MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
 
       MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3,
-          HConstants.LATEST_TIMESTAMP, false);
+          HConstants.LATEST_TIMESTAMP);
 
       assertEmptyMetaLocation(meta, merged.getRegionName(), 1);
       assertEmptyMetaLocation(meta, merged.getRegionName(), 2);
@@ -556,7 +556,7 @@ public class TestMetaTableAccessor {
 
       // now merge the regions, effectively deleting the rows for region a and b.
       MetaTableAccessor.mergeRegions(connection, mergedRegionInfo,
-        regionInfoA, regionInfoB, sn, 1, masterSystemTime, false);
+        regionInfoA, regionInfoB, sn, 1, masterSystemTime);
 
       result = meta.get(get);
       serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
@@ -639,7 +639,7 @@ public class TestMetaTableAccessor {
       }
       SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler();
       long prevCalls = scheduler.numPriorityCalls;
-      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB,loc.getServerName(),1,false);
+      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1);
 
       assertTrue(prevCalls < scheduler.numPriorityCalls);
     }
@@ -661,7 +661,7 @@ public class TestMetaTableAccessor {
       List<HRegionInfo> regionInfos = Lists.newArrayList(parent);
       MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
 
-      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false);
+      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
       Get get1 = new Get(splitA.getRegionName());
       Result resultA = meta.get(get1);
       Cell serverCellA = resultA.getColumnLatestCell(HConstants.CATALOG_FAMILY,

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java
index bca8cf3..a91560e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java
@@ -166,7 +166,7 @@ public class TestMetaScanner {
               end);
 
             MetaTableAccessor.splitRegion(connection,
-              parent, splita, splitb, ServerName.valueOf("fooserver", 1, 0), 1, false);
+              parent, splita, splitb, ServerName.valueOf("fooserver", 1, 0), 1);
 
             Threads.sleep(random.nextInt(200));
           } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
index 78b23c0..69dfa40 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
@@ -1317,7 +1317,7 @@ public class TestAssignmentManagerOnCluster {
     }
     conf.setInt("hbase.regionstatestore.meta.connection", 3);
     final RegionStateStore rss =
-        new RegionStateStore(new MyMaster(conf, new ZkCoordinatedStateManager()));
+        new RegionStateStore(new MyRegionServer(conf, new ZkCoordinatedStateManager()));
     rss.start();
     // Create 10 threads and make each do 10 puts related to region state update
     Thread[] th = new Thread[10];


[2/3] hbase git commit: Revert "HBASE-9465 Push entries to peer clusters serially"

Posted by zh...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
deleted file mode 100644
index b0f687c..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.HTestConst;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-@Category({ ReplicationTests.class, LargeTests.class })
-public class TestSerialReplication {
-  private static final Log LOG = LogFactory.getLog(TestSerialReplication.class);
-
-  private static Configuration conf1;
-  private static Configuration conf2;
-
-  private static HBaseTestingUtility utility1;
-  private static HBaseTestingUtility utility2;
-
-  private static final byte[] famName = Bytes.toBytes("f");
-  private static final byte[] VALUE = Bytes.toBytes("v");
-  private static final byte[] ROW = Bytes.toBytes("r");
-  private static final byte[][] ROWS = HTestConst.makeNAscii(ROW, 100);
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    conf1 = HBaseConfiguration.create();
-    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
-    // smaller block size and capacity to trigger more operations
-    // and test them
-    conf1.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
-    conf1.setInt("replication.source.size.capacity", 1024);
-    conf1.setLong("replication.source.sleepforretries", 100);
-    conf1.setInt("hbase.regionserver.maxlogs", 10);
-    conf1.setLong("hbase.master.logcleaner.ttl", 10);
-    conf1.setBoolean("dfs.support.append", true);
-    conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
-    conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
-        "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
-    conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);// Each WAL is 120 bytes
-    conf1.setLong("replication.source.size.capacity", 1L);
-    conf1.setLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY, 1000L);
-    conf1.setBoolean("hbase.assignment.usezk", false);
-
-    utility1 = new HBaseTestingUtility(conf1);
-    utility1.startMiniZKCluster();
-    MiniZooKeeperCluster miniZK = utility1.getZkCluster();
-    new ZooKeeperWatcher(conf1, "cluster1", null, true);
-
-    conf2 = new Configuration(conf1);
-    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
-
-    utility2 = new HBaseTestingUtility(conf2);
-    utility2.setZkCluster(miniZK);
-    new ZooKeeperWatcher(conf2, "cluster2", null, true);
-
-    ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
-    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
-    rpc.setClusterKey(utility2.getClusterKey());
-    admin1.addPeer("1", rpc, null);
-
-    utility1.startMiniCluster(1, 3);
-    utility2.startMiniCluster(1, 1);
-
-    utility1.getHBaseAdmin().setBalancerRunning(false, true);
-  }
-
-  @Test
-  public void testRegionMoveAndFailover() throws Exception {
-    TableName tableName = TableName.valueOf("testRSFailover");
-    HTableDescriptor table = new HTableDescriptor(tableName);
-    HColumnDescriptor fam = new HColumnDescriptor(famName);
-    fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
-    table.addFamily(fam);
-    utility1.getHBaseAdmin().createTable(table);
-    utility2.getHBaseAdmin().createTable(table);
-    try(Table t1 = utility1.getConnection().getTable(tableName);
-        Table t2 = utility2.getConnection().getTable(tableName)) {
-      LOG.info("move to 1");
-      moveRegion(t1, 1);
-      LOG.info("move to 0");
-      moveRegion(t1, 0);
-      for (int i = 10; i < 20; i++) {
-        Put put = new Put(ROWS[i]);
-        put.addColumn(famName, VALUE, VALUE);
-        t1.put(put);
-      }
-      LOG.info("move to 2");
-      moveRegion(t1, 2);
-      for (int i = 20; i < 30; i++) {
-        Put put = new Put(ROWS[i]);
-        put.addColumn(famName, VALUE, VALUE);
-        t1.put(put);
-      }
-      utility1.getHBaseCluster().abortRegionServer(2);
-      for (int i = 30; i < 40; i++) {
-        Put put = new Put(ROWS[i]);
-        put.addColumn(famName, VALUE, VALUE);
-        t1.put(put);
-      }
-
-      long start = EnvironmentEdgeManager.currentTime();
-      while (EnvironmentEdgeManager.currentTime() - start < 18000000) {
-        Scan scan = new Scan();
-        scan.setCaching(100);
-        List<Cell> list = new ArrayList<>();
-        try (ResultScanner results = t2.getScanner(scan)) {
-          for (Result result : results) {
-            assertEquals(1, result.rawCells().length);
-            list.add(result.rawCells()[0]);
-          }
-        }
-        List<Integer> listOfNumbers = getRowNumbers(list);
-        LOG.info(Arrays.toString(listOfNumbers.toArray()));
-        assertIntegerList(listOfNumbers, 10, 1);
-        if (listOfNumbers.size() != 30) {
-          LOG.info("Waiting all logs pushed to slave. Expected 30 , actual " + list.size());
-          Thread.sleep(200);
-          continue;
-        }
-        return;
-      }
-      throw new Exception("Not all logs have been pushed");
-    } finally {
-      utility1.getHBaseAdmin().disableTable(tableName);
-      utility2.getHBaseAdmin().disableTable(tableName);
-      utility1.getHBaseAdmin().deleteTable(tableName);
-      utility2.getHBaseAdmin().deleteTable(tableName);
-    }
-  }
-
-  @Test
-  public void testRegionSplit() throws Exception {
-    TableName tableName = TableName.valueOf("testRegionSplit");
-    HTableDescriptor table = new HTableDescriptor(tableName);
-    HColumnDescriptor fam = new HColumnDescriptor(famName);
-    fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
-    table.addFamily(fam);
-    utility1.getHBaseAdmin().createTable(table);
-    utility2.getHBaseAdmin().createTable(table);
-    try(Table t1 = utility1.getConnection().getTable(tableName);
-        Table t2 = utility2.getConnection().getTable(tableName)) {
-
-      for (int i = 10; i < 100; i += 10) {
-        Put put = new Put(ROWS[i]);
-        put.addColumn(famName, VALUE, VALUE);
-        t1.put(put);
-      }
-      utility1.getHBaseAdmin().split(tableName, ROWS[50]);
-      Thread.sleep(5000L);
-      for (int i = 11; i < 100; i += 10) {
-        Put put = new Put(ROWS[i]);
-        put.addColumn(famName, VALUE, VALUE);
-        t1.put(put);
-      }
-      balanceTwoRegions(t1);
-      for (int i = 12; i < 100; i += 10) {
-        Put put = new Put(ROWS[i]);
-        put.addColumn(famName, VALUE, VALUE);
-        t1.put(put);
-      }
-
-      long start = EnvironmentEdgeManager.currentTime();
-      while (EnvironmentEdgeManager.currentTime() - start < 180000) {
-        Scan scan = new Scan();
-        scan.setCaching(100);
-        List<Cell> list = new ArrayList<>();
-        try (ResultScanner results = t2.getScanner(scan)) {
-          for (Result result : results) {
-            assertEquals(1, result.rawCells().length);
-            list.add(result.rawCells()[0]);
-          }
-        }
-        List<Integer> listOfNumbers = getRowNumbers(list);
-        List<Integer> list0 = new ArrayList<>();
-        List<Integer> list1 = new ArrayList<>();
-        List<Integer> list21 = new ArrayList<>();
-        List<Integer> list22 = new ArrayList<>();
-        for (int num : listOfNumbers) {
-          if (num % 10 == 0) {
-            list0.add(num);
-          } else if (num % 10 == 1) {
-            list1.add(num);
-          } else if (num < 50) { //num%10==2
-            list21.add(num);
-          } else { // num%10==1&&num>50
-            list22.add(num);
-          }
-        }
-
-        LOG.info(Arrays.toString(list0.toArray()));
-        LOG.info(Arrays.toString(list1.toArray()));
-        LOG.info(Arrays.toString(list21.toArray()));
-        LOG.info(Arrays.toString(list22.toArray()));
-        assertIntegerList(list0, 10, 10);
-        assertIntegerList(list1, 11, 10);
-        assertIntegerList(list21, 12, 10);
-        assertIntegerList(list22, 52, 10);
-        if (!list1.isEmpty()) {
-          assertEquals(9, list0.size());
-        }
-        if (!list21.isEmpty() || !list22.isEmpty()) {
-          assertEquals(9, list1.size());
-        }
-
-        if (list.size() == 27) {
-          return;
-        }
-        LOG.info("Waiting all logs pushed to slave. Expected 27 , actual " + list.size());
-        Thread.sleep(200);
-      }
-      throw new Exception("Not all logs have been pushed");
-    } finally {
-      utility1.getHBaseAdmin().disableTable(tableName);
-      utility2.getHBaseAdmin().disableTable(tableName);
-      utility1.getHBaseAdmin().deleteTable(tableName);
-      utility2.getHBaseAdmin().deleteTable(tableName);
-    }
-  }
-
-  @Test
-  public void testRegionMerge() throws Exception {
-    TableName tableName = TableName.valueOf("testRegionMerge");
-    HTableDescriptor table = new HTableDescriptor(tableName);
-    HColumnDescriptor fam = new HColumnDescriptor(famName);
-    fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
-    table.addFamily(fam);
-    utility1.getHBaseAdmin().createTable(table);
-    utility2.getHBaseAdmin().createTable(table);
-    utility1.getHBaseAdmin().split(tableName, ROWS[50]);
-    Thread.sleep(5000L);
-
-    try(Table t1 = utility1.getConnection().getTable(tableName);
-        Table t2 = utility2.getConnection().getTable(tableName)) {
-      for (int i = 10; i < 100; i += 10) {
-        Put put = new Put(ROWS[i]);
-        put.addColumn(famName, VALUE, VALUE);
-        t1.put(put);
-      }
-      List<Pair<HRegionInfo, ServerName>> regions =
-          MetaTableAccessor.getTableRegionsAndLocations(utility1.getZooKeeperWatcher(),
-              utility1.getConnection(), tableName);
-      assertEquals(2, regions.size());
-      utility1.getHBaseAdmin().mergeRegions(regions.get(0).getFirst().getRegionName(),
-          regions.get(1).getFirst().getRegionName(), true);
-      for (int i = 11; i < 100; i += 10) {
-        Put put = new Put(ROWS[i]);
-        put.addColumn(famName, VALUE, VALUE);
-        t1.put(put);
-      }
-
-      long start = EnvironmentEdgeManager.currentTime();
-      while (EnvironmentEdgeManager.currentTime() - start < 180000) {
-        Scan scan = new Scan();
-        scan.setCaching(100);
-        List<Cell> list = new ArrayList<>();
-        try (ResultScanner results = t2.getScanner(scan)) {
-          for (Result result : results) {
-            assertEquals(1, result.rawCells().length);
-            list.add(result.rawCells()[0]);
-          }
-        }
-        List<Integer> listOfNumbers = getRowNumbers(list);
-        List<Integer> list0 = new ArrayList<>();
-        List<Integer> list1 = new ArrayList<>();
-        for (int num : listOfNumbers) {
-          if (num % 10 == 0) {
-            list0.add(num);
-          } else {
-            list1.add(num);
-          }
-        }
-        LOG.info(Arrays.toString(list0.toArray()));
-        LOG.info(Arrays.toString(list1.toArray()));
-        assertIntegerList(list0, 10, 10);
-        assertIntegerList(list1, 11, 10);
-        if (!list1.isEmpty()) {
-          assertEquals(9, list0.size());
-        }
-        if (list.size() == 18) {
-          return;
-        }
-        LOG.info("Waiting all logs pushed to slave. Expected 18 , actual " + list.size());
-        Thread.sleep(200);
-      }
-    } finally {
-      utility1.getHBaseAdmin().disableTable(tableName);
-      utility2.getHBaseAdmin().disableTable(tableName);
-      utility1.getHBaseAdmin().deleteTable(tableName);
-      utility2.getHBaseAdmin().deleteTable(tableName);
-    }
-  }
-
-  private List<Integer> getRowNumbers(List<Cell> cells) {
-    List<Integer> listOfRowNumbers = new ArrayList<>();
-    for (Cell c : cells) {
-      listOfRowNumbers.add(Integer.parseInt(Bytes
-          .toString(c.getRowArray(), c.getRowOffset() + ROW.length,
-              c.getRowLength() - ROW.length)));
-    }
-    return listOfRowNumbers;
-  }
-
-  @AfterClass
-  public static void setUpAfterClass() throws Exception {
-    utility2.shutdownMiniCluster();
-    utility1.shutdownMiniCluster();
-  }
-
-  private void moveRegion(Table table, int index) throws IOException {
-    List<Pair<HRegionInfo, ServerName>> regions =
-        MetaTableAccessor.getTableRegionsAndLocations(utility1.getZooKeeperWatcher(),
-            utility1.getConnection(), table.getName());
-    assertEquals(1, regions.size());
-    HRegionInfo regionInfo = regions.get(0).getFirst();
-    ServerName name = utility1.getHBaseCluster().getRegionServer(index).getServerName();
-    utility1.getHBaseAdmin()
-        .move(regionInfo.getEncodedNameAsBytes(), Bytes.toBytes(name.getServerName()));
-    try {
-      Thread.sleep(5000L); // wait to complete
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  private void balanceTwoRegions(Table table) throws Exception {
-    List<Pair<HRegionInfo, ServerName>> regions =
-        MetaTableAccessor.getTableRegionsAndLocations(utility1.getZooKeeperWatcher(),
-            utility1.getConnection(), table.getName());
-    assertEquals(2, regions.size());
-    HRegionInfo regionInfo1 = regions.get(0).getFirst();
-    ServerName name1 = utility1.getHBaseCluster().getRegionServer(0).getServerName();
-    HRegionInfo regionInfo2 = regions.get(1).getFirst();
-    ServerName name2 = utility1.getHBaseCluster().getRegionServer(1).getServerName();
-    utility1.getHBaseAdmin()
-        .move(regionInfo1.getEncodedNameAsBytes(), Bytes.toBytes(name1.getServerName()));
-    Thread.sleep(5000L);
-    utility1.getHBaseAdmin()
-        .move(regionInfo2.getEncodedNameAsBytes(), Bytes.toBytes(name2.getServerName()));
-    Thread.sleep(5000L);
-  }
-
-  private void assertIntegerList(List<Integer> list, int start, int step) {
-    for (int i = 0; i < list.size(); i++) {
-      assertTrue(list.get(i) == start + step * i);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
index 48ef781..6e19fc2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java
@@ -115,7 +115,7 @@ public class TestGlobalThrottler {
     TableName tableName = TableName.valueOf("testQuota");
     HTableDescriptor table = new HTableDescriptor(tableName);
     HColumnDescriptor fam = new HColumnDescriptor(famName);
-    fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
+    fam.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
     table.addFamily(fam);
     utility1.getHBaseAdmin().createTable(table);
     utility2.getHBaseAdmin().createTable(table);