You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/11/08 22:13:27 UTC
[2/3] hbase git commit: Revert "HBASE-9465 Push entries to peer
clusters serially"
Revert "HBASE-9465 Push entries to peer clusters serially"
This reverts commit 441bc050b991c14c048617bc443b97f46e21b76f.
Conflicts:
hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/93c91666
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/93c91666
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/93c91666
Branch: refs/heads/branch-1.4
Commit: 93c91666dcc4d97a4a2063f8afd5e5b80cd3b8fc
Parents: 6e4e67f
Author: Sean Busbey <bu...@apache.org>
Authored: Tue Nov 7 23:50:35 2017 -0600
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Nov 8 10:00:08 2017 -0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/HTableDescriptor.java | 46 +--
.../apache/hadoop/hbase/MetaTableAccessor.java | 243 ++---------
.../client/replication/ReplicationAdmin.java | 14 +-
.../org/apache/hadoop/hbase/HConstants.java | 26 --
.../src/main/resources/hbase-default.xml | 14 -
.../hbase/protobuf/generated/WALProtos.java | 16 +-
hbase-protocol/src/main/protobuf/WAL.proto | 1 -
.../org/apache/hadoop/hbase/master/HMaster.java | 9 -
.../hadoop/hbase/master/RegionStateStore.java | 43 +-
.../master/cleaner/ReplicationMetaCleaner.java | 187 ---------
.../RegionMergeTransactionImpl.java | 3 +-
.../hbase/regionserver/ReplicationService.java | 1 -
.../regionserver/SplitTransactionImpl.java | 2 +-
.../replication/regionserver/Replication.java | 14 +-
.../regionserver/ReplicationSource.java | 68 +---
.../regionserver/ReplicationSourceManager.java | 87 +---
.../ReplicationSourceWALReaderThread.java | 31 --
.../java/org/apache/hadoop/hbase/wal/WAL.java | 16 -
.../hadoop/hbase/TestMetaTableAccessor.java | 10 +-
.../hadoop/hbase/client/TestMetaScanner.java | 2 +-
.../master/TestAssignmentManagerOnCluster.java | 2 +-
.../replication/TestSerialReplication.java | 401 -------------------
.../regionserver/TestGlobalThrottler.java | 2 +-
23 files changed, 69 insertions(+), 1169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index 1fd950a..7f48976 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -34,12 +34,13 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.regex.Matcher;
+import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -51,7 +52,6 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.WritableComparable;
@@ -1217,18 +1217,6 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
}
/**
- * Return true if there are at least one cf whose replication scope is serial.
- */
- public boolean hasSerialReplicationScope() {
- for (HColumnDescriptor column: getFamilies()){
- if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL){
- return true;
- }
- }
- return false;
- }
-
- /**
* Returns the configured replicas per region
*/
public int getRegionReplication() {
@@ -1772,32 +1760,8 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
.setBloomFilterType(BloomType.NONE)
- .setCacheDataInL1(true),
- new HColumnDescriptor(HConstants.REPLICATION_BARRIER_FAMILY)
- .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
- HConstants.DEFAULT_HBASE_META_VERSIONS))
- .setInMemory(true)
- .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
- HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
- .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
- // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
- .setBloomFilterType(BloomType.NONE)
- // Enable cache of data blocks in L1 if more than one caching tier deployed:
- // e.g. if using CombinedBlockCache (BucketCache).
- .setCacheDataInL1(true),
- new HColumnDescriptor(HConstants.REPLICATION_POSITION_FAMILY)
- .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
- HConstants.DEFAULT_HBASE_META_VERSIONS))
- .setInMemory(true)
- .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
- HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
- .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
- // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
- .setBloomFilterType(BloomType.NONE)
- // Enable cache of data blocks in L1 if more than one caching tier deployed:
- // e.g. if using CombinedBlockCache (BucketCache).
- .setCacheDataInL1(true),
- });
+ .setCacheDataInL1(true)
+ });
metaDescriptor.addCoprocessor(
"org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint",
null, Coprocessor.PRIORITY_SYSTEM, null);
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index c7e3757..3f11558 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -20,20 +20,6 @@ package org.apache.hadoop.hbase;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -63,6 +49,18 @@ import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
/**
* Read/write operations on region and assignment information store in
* <code>hbase:meta</code>.
@@ -109,27 +107,10 @@ public class MetaTableAccessor {
*
* The actual layout of meta should be encapsulated inside MetaTableAccessor methods,
* and should not leak out of it (through Result objects, etc)
- *
- * For replication serially, there are two column families "rep_barrier", "rep_position" whose
- * row key is encodedRegionName.
- * rep_barrier:{seqid} => in each time a RS opens a region, it saves the open sequence
- * id in this region
- * rep_position:{peerid} => to save the max sequence id we have pushed for each peer
- * rep_position:_TABLENAME_ => a special cell to save this region's table name, will used when
- * we clean old data
- * rep_position:_DAUGHTER_ => a special cell to present this region is split or merged, in this
- * cell the value is merged encoded name or two split encoded names
- * separated by ","
*/
private static final Log LOG = LogFactory.getLog(MetaTableAccessor.class);
- // Save its daughter region(s) when split/merge
- private static final byte[] daughterNamePosCq = Bytes.toBytes("_DAUGHTER_");
- // Save its table name because we only know region's encoded name
- private static final String tableNamePeer = "_TABLENAME_";
- private static final byte[] tableNamePosCq = Bytes.toBytes(tableNamePeer);
-
static final byte [] META_REGION_PREFIX;
static {
// Copy the prefix from FIRST_META_REGIONINFO into META_REGION_PREFIX.
@@ -981,19 +962,6 @@ public class MetaTableAccessor {
return delete;
}
- public static Put makeBarrierPut(byte[] encodedRegionName, long seq, byte[] tableName) {
- byte[] seqBytes = Bytes.toBytes(seq);
- return new Put(encodedRegionName)
- .addImmutable(HConstants.REPLICATION_BARRIER_FAMILY, seqBytes, seqBytes)
- .addImmutable(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq, tableName);
- }
-
-
- public static Put makeSerialDaughterPut(byte[] encodedRegionName, byte[] value) {
- return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_POSITION_FAMILY,
- daughterNamePosCq, value);
- }
-
/**
* Adds split daughters to the Put
*/
@@ -1010,24 +978,24 @@ public class MetaTableAccessor {
}
/**
- * Put the passed <code>puts</code> to the <code>hbase:meta</code> table.
- * Non-atomic for multi puts.
+ * Put the passed <code>p</code> to the <code>hbase:meta</code> table.
* @param connection connection we're using
- * @param puts Put to add to hbase:meta
+ * @param p Put to add to hbase:meta
* @throws IOException
*/
- static void putToMetaTable(final Connection connection, final Put... puts) throws IOException {
- put(getMetaHTable(connection), Arrays.asList(puts));
+ static void putToMetaTable(final Connection connection, final Put p)
+ throws IOException {
+ put(getMetaHTable(connection), p);
}
/**
* @param t Table to use (will be closed when done).
- * @param puts puts to make
+ * @param p put to make
* @throws IOException
*/
- private static void put(final Table t, final List<Put> puts) throws IOException {
+ private static void put(final Table t, final Put p) throws IOException {
try {
- t.put(puts);
+ t.put(p);
} finally {
t.close();
}
@@ -1153,7 +1121,7 @@ public class MetaTableAccessor {
* Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
* does not add its daughter's as different rows, but adds information about the daughters
* in the same row as the parent. Use
- * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)}
+ * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)}
* if you want to do that.
* @param meta the Table for META
* @param regionInfo region information
@@ -1175,7 +1143,7 @@ public class MetaTableAccessor {
* Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
* does not add its daughter's as different rows, but adds information about the daughters
* in the same row as the parent. Use
- * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)}
+ * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)}
* if you want to do that.
* @param connection connection we're using
* @param regionInfo region information
@@ -1264,7 +1232,7 @@ public class MetaTableAccessor {
*/
public static void mergeRegions(final Connection connection, HRegionInfo mergedRegion,
HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int regionReplication,
- long masterSystemTime, boolean saveBarrier)
+ long masterSystemTime)
throws IOException {
Table meta = getMetaHTable(connection);
try {
@@ -1295,17 +1263,7 @@ public class MetaTableAccessor {
byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString()
+ HConstants.DELIMITER);
- Mutation[] mutations;
- if (saveBarrier) {
- Put putBarrierA = makeSerialDaughterPut(regionA.getEncodedNameAsBytes(),
- Bytes.toBytes(mergedRegion.getEncodedName()));
- Put putBarrierB = makeSerialDaughterPut(regionB.getEncodedNameAsBytes(),
- Bytes.toBytes(mergedRegion.getEncodedName()));
- mutations = new Mutation[] { putOfMerged, deleteA, deleteB, putBarrierA, putBarrierB };
- } else {
- mutations = new Mutation[] { putOfMerged, deleteA, deleteB };
- }
- multiMutate(meta, tableRow, mutations);
+ multiMutate(meta, tableRow, putOfMerged, deleteA, deleteB);
} finally {
meta.close();
}
@@ -1321,11 +1279,10 @@ public class MetaTableAccessor {
* @param splitA Split daughter region A
* @param splitB Split daughter region A
* @param sn the location of the region
- * @param saveBarrier true if need save replication barrier in meta, used for serial replication
*/
- public static void splitRegion(final Connection connection, HRegionInfo parent,
- HRegionInfo splitA, HRegionInfo splitB, ServerName sn, int regionReplication,
- boolean saveBarrier) throws IOException {
+ public static void splitRegion(final Connection connection,
+ HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
+ ServerName sn, int regionReplication) throws IOException {
Table meta = getMetaHTable(connection);
try {
HRegionInfo copyOfParent = new HRegionInfo(parent);
@@ -1350,17 +1307,8 @@ public class MetaTableAccessor {
addEmptyLocation(putB, i);
}
- Mutation[] mutations;
- if (saveBarrier) {
- Put putBarrier = makeSerialDaughterPut(parent.getEncodedNameAsBytes(),
- Bytes
- .toBytes(splitA.getEncodedName() + HConstants.DELIMITER + splitB.getEncodedName()));
- mutations = new Mutation[]{putParent, putA, putB, putBarrier};
- } else {
- mutations = new Mutation[]{putParent, putA, putB};
- }
byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER);
- multiMutate(meta, tableRow, mutations);
+ multiMutate(meta, tableRow, putParent, putA, putB);
} finally {
meta.close();
}
@@ -1418,27 +1366,6 @@ public class MetaTableAccessor {
}
/**
- * Updates the progress of pushing entries to peer cluster. Skip entry if value is -1.
- * @param connection connection we're using
- * @param peerId the peerId to push
- * @param positions map that saving positions for each region
- * @throws IOException
- */
- public static void updateReplicationPositions(Connection connection, String peerId,
- Map<String, Long> positions) throws IOException {
- List<Put> puts = new ArrayList<>();
- for (Map.Entry<String, Long> entry : positions.entrySet()) {
- long value = Math.abs(entry.getValue());
- Put put = new Put(Bytes.toBytes(entry.getKey()));
- put.addImmutable(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId),
- Bytes.toBytes(value));
- puts.add(put);
- }
- getMetaHTable(connection).put(puts);
- }
-
-
- /**
* Updates the location of the specified region to be the specified server.
* <p>
* Connects to the specified server which should be hosting the specified
@@ -1623,120 +1550,6 @@ public class MetaTableAccessor {
}
/**
- * Get replication position for a peer in a region.
- * @param connection connection we're using
- * @return the position of this peer, -1 if no position in meta.
- */
- public static long getReplicationPositionForOnePeer(Connection connection,
- byte[] encodedRegionName, String peerId) throws IOException {
- Get get = new Get(encodedRegionName);
- get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId));
- Result r = get(getMetaHTable(connection), get);
- if (r.isEmpty()) {
- return -1;
- }
- Cell cell = r.rawCells()[0];
- return Bytes.toLong(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
- }
-
- /**
- * Get replication positions for all peers in a region.
- * @param connection connection we're using
- * @param encodedRegionName region's encoded name
- * @return the map of positions for each peer
- */
- public static Map<String, Long> getReplicationPositionForAllPeer(Connection connection,
- byte[] encodedRegionName) throws IOException {
- Get get = new Get(encodedRegionName);
- get.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
- Result r = get(getMetaHTable(connection), get);
- Map<String, Long> map = new HashMap<>((int) (r.size() / 0.75 + 1));
- for (Cell c : r.listCells()) {
- if (!Bytes.equals(tableNamePosCq, 0, tableNamePosCq.length, c.getQualifierArray(),
- c.getQualifierOffset(), c.getQualifierLength()) &&
- !Bytes.equals(daughterNamePosCq, 0, daughterNamePosCq.length, c.getQualifierArray(),
- c.getQualifierOffset(), c.getQualifierLength())) {
- map.put(
- Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()),
- Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
- }
- }
- return map;
- }
-
- /**
- * Get all barriers in all regions.
- * @return a map of barrier lists in all regions
- * @throws IOException
- */
- public static List<Long> getReplicationBarriers(Connection connection, byte[] encodedRegionName)
- throws IOException {
- Get get = new Get(encodedRegionName);
- get.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
- Result r = get(getMetaHTable(connection), get);
- List<Long> list = new ArrayList<>();
- if (!r.isEmpty()) {
- for (Cell cell : r.rawCells()) {
- list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
- cell.getQualifierLength()));
- }
- }
- return list;
- }
-
- public static Map<String, List<Long>> getAllBarriers(Connection connection) throws IOException {
- Map<String, List<Long>> map = new HashMap<>();
- Scan scan = new Scan();
- scan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
- try (Table t = getMetaHTable(connection);
- ResultScanner scanner = t.getScanner(scan)) {
- Result result;
- while ((result = scanner.next()) != null) {
- String key = Bytes.toString(result.getRow());
- List<Long> list = new ArrayList<>();
- for (Cell cell : result.rawCells()) {
- list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
- cell.getQualifierLength()));
- }
- map.put(key, list);
- }
- }
- return map;
- }
-
- /**
- * Get daughter region(s) for a region, only used in serial replication.
- * @throws IOException
- */
- public static String getSerialReplicationDaughterRegion(Connection connection, byte[] encodedName)
- throws IOException {
- Get get = new Get(encodedName);
- get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, daughterNamePosCq);
- Result result = get(getMetaHTable(connection), get);
- if (!result.isEmpty()) {
- Cell c = result.rawCells()[0];
- return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength());
- }
- return null;
- }
-
- /**
- * Get the table name for a region, only used in serial replication.
- * @throws IOException
- */
- public static String getSerialReplicationTableName(Connection connection, byte[] encodedName)
- throws IOException {
- Get get = new Get(encodedName);
- get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq);
- Result result = get(getMetaHTable(connection), get);
- if (!result.isEmpty()) {
- Cell c = result.rawCells()[0];
- return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength());
- }
- return null;
- }
-
- /**
* Checks whether hbase:meta contains any info:server entry.
* @param connection connection we're using
* @return true if hbase:meta contains any info:server entry, false if not
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 80f6e10..73fec38 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -91,10 +91,8 @@ public class ReplicationAdmin implements Closeable {
// only Global for now, can add other type
// such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
public static final String REPLICATIONTYPE = "replicationType";
- public static final String REPLICATIONGLOBAL =
- Integer.toString(HConstants.REPLICATION_SCOPE_GLOBAL);
- public static final String REPLICATIONSERIAL =
- Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL);
+ public static final String REPLICATIONGLOBAL = Integer
+ .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
private final Connection connection;
// TODO: replication should be managed by master. All the classes except ReplicationAdmin should
@@ -488,10 +486,7 @@ public class ReplicationAdmin implements Closeable {
HashMap<String, String> replicationEntry = new HashMap<String, String>();
replicationEntry.put(TNAME, tableName);
replicationEntry.put(CFNAME, column.getNameAsString());
- replicationEntry.put(REPLICATIONTYPE,
- column.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL ?
- REPLICATIONGLOBAL :
- REPLICATIONSERIAL);
+ replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
replicationColFams.add(replicationEntry);
}
}
@@ -703,8 +698,7 @@ public class ReplicationAdmin implements Closeable {
boolean hasDisabled = false;
for (HColumnDescriptor hcd : htd.getFamilies()) {
- if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
- && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
+ if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
hasDisabled = true;
} else {
hasEnabled = true;
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 4880896..79fb00d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -449,20 +449,6 @@ public final class HConstants {
/** The catalog family */
public static final byte [] CATALOG_FAMILY = Bytes.toBytes(CATALOG_FAMILY_STR);
- /** The replication barrier family as a string*/
- public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier";
-
- /** The replication barrier family */
- public static final byte [] REPLICATION_BARRIER_FAMILY =
- Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR);
-
- /** The replication barrier family as a string*/
- public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position";
-
- /** The replication barrier family */
- public static final byte [] REPLICATION_POSITION_FAMILY =
- Bytes.toBytes(REPLICATION_POSITION_FAMILY_STR);
-
/** The RegionInfo qualifier as a string */
public static final String REGIONINFO_QUALIFIER_STR = "regioninfo";
@@ -658,12 +644,6 @@ public final class HConstants {
public static final int REPLICATION_SCOPE_GLOBAL = 1;
/**
- * Scope tag for serially scoped data
- * This data will be replicated to all peers by the order of sequence id.
- */
- public static final int REPLICATION_SCOPE_SERIAL = 2;
-
- /**
* Default cluster ID, cannot be used to identify a cluster so a key with
* this value means it wasn't meant for replication.
*/
@@ -914,12 +894,6 @@ public final class HConstants {
public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
/** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id";
-
- public static final String
- REPLICATION_SERIALLY_WAITING_KEY = "hbase.serial.replication.waitingMs";
- public static final long
- REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
-
/**
* Max total size of buffered entries in all replication peers. It will prevent server getting
* OOM if there are many peers. Default value is 256MB which is four times to default
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 5908359..e1ae0ef 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1542,20 +1542,6 @@ possible configurations would overwhelm and obscure the important.
slave clusters. The default of 10 will rarely need to be changed.
</description>
</property>
- <property>
- <name>hbase.serial.replication.waitingMs</name>
- <value>10000</value>
- <description>
- By default, in replication we can not make sure the order of operations in slave cluster is
- same as the order in master. If set REPLICATION_SCOPE to 2, we will push edits by the order
- of written. This configure is to set how long (in ms) we will wait before next checking if a
- log can not push right now because there are some logs written before it have not been pushed.
- A larger waiting will decrease the number of queries on hbase:meta but will enlarge the delay
- of replication. This feature relies on zk-less assignment, and conflicts with distributed log
- replay. So users must set hbase.assignment.usezk and hbase.master.distributed.log.replay to
- false to support it.
- </description>
- </property>
<!-- Static Web User Filter properties. -->
<property>
<description>
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index a466e6c..e0efab4 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -21,10 +21,6 @@ public final class WALProtos {
* <code>REPLICATION_SCOPE_GLOBAL = 1;</code>
*/
REPLICATION_SCOPE_GLOBAL(1, 1),
- /**
- * <code>REPLICATION_SCOPE_SERIAL = 2;</code>
- */
- REPLICATION_SCOPE_SERIAL(2, 2),
;
/**
@@ -35,10 +31,6 @@ public final class WALProtos {
* <code>REPLICATION_SCOPE_GLOBAL = 1;</code>
*/
public static final int REPLICATION_SCOPE_GLOBAL_VALUE = 1;
- /**
- * <code>REPLICATION_SCOPE_SERIAL = 2;</code>
- */
- public static final int REPLICATION_SCOPE_SERIAL_VALUE = 2;
public final int getNumber() { return value; }
@@ -47,7 +39,6 @@ public final class WALProtos {
switch (value) {
case 0: return REPLICATION_SCOPE_LOCAL;
case 1: return REPLICATION_SCOPE_GLOBAL;
- case 2: return REPLICATION_SCOPE_SERIAL;
default: return null;
}
}
@@ -12023,11 +12014,10 @@ public final class WALProtos {
"e.pb.StoreDescriptor\022$\n\006server\030\006 \001(\0132\024.h" +
"base.pb.ServerName\022\023\n\013region_name\030\007 \001(\014\"" +
".\n\tEventType\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_" +
- "CLOSE\020\001\"\014\n\nWALTrailer*d\n\tScopeType\022\033\n\027RE" +
+ "CLOSE\020\001\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027RE" +
"PLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_S" +
- "COPE_GLOBAL\020\001\022\034\n\030REPLICATION_SCOPE_SERIA" +
- "L\020\002B?\n*org.apache.hadoop.hbase.protobuf." +
- "generatedB\tWALProtosH\001\210\001\000\240\001\001"
+ "COPE_GLOBAL\020\001B?\n*org.apache.hadoop.hbase" +
+ ".protobuf.generatedB\tWALProtosH\001\210\001\000\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-protocol/src/main/protobuf/WAL.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto
index 08925f8..a888686 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -77,7 +77,6 @@ message WALKey {
enum ScopeType {
REPLICATION_SCOPE_LOCAL = 0;
REPLICATION_SCOPE_GLOBAL = 1;
- REPLICATION_SCOPE_SERIAL = 2;
}
message FamilyScope {
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 43ead37..f5f48d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -98,7 +98,6 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
-import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore;
@@ -330,7 +329,6 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
CatalogJanitor catalogJanitorChore;
private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore;
- private ReplicationMetaCleaner replicationMetaCleaner;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
@@ -1249,12 +1247,6 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
} catch (Exception e) {
LOG.error("start replicationZKNodeCleanerChore failed", e);
}
- try {
- replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
- getChoreService().scheduleChore(replicationMetaCleaner);
- } catch (Exception e) {
- LOG.error("start ReplicationMetaCleaner failed", e);
- }
}
@Override
@@ -1290,7 +1282,6 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true);
if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true);
- if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
if (this.quotaManager != null) this.quotaManager.stop();
if (this.activeMasterManager != null) this.activeMasterManager.stop();
if (this.serverManager != null) this.serverManager.stop();
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
index 2d445e2..476b4d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
@@ -17,15 +17,11 @@
*/
package org.apache.hadoop.hbase.master;
-import com.google.common.base.Preconditions;
-
import java.io.IOException;
import java.util.Arrays;
-import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -49,6 +45,8 @@ import org.apache.hadoop.hbase.util.MultiHConnection;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.zookeeper.KeeperException;
+import com.google.common.base.Preconditions;
+
/**
* A helper to persist region state in meta. We may change this class
* to StateStore later if we also use it to store other states in meta
@@ -65,7 +63,7 @@ public class RegionStateStore {
private volatile boolean initialized;
private final boolean noPersistence;
- private final MasterServices server;
+ private final Server server;
/**
* Returns the {@link ServerName} from catalog table {@link Result}
@@ -135,7 +133,7 @@ public class RegionStateStore {
State.SPLITTING_NEW, State.MERGED));
}
- RegionStateStore(final MasterServices server) {
+ RegionStateStore(final Server server) {
Configuration conf = server.getConfiguration();
// No need to persist if using ZK but not migrating
noPersistence = ConfigUtil.useZKForAssignment(conf)
@@ -200,41 +198,31 @@ public class RegionStateStore {
State state = newState.getState();
int replicaId = hri.getReplicaId();
- Put metaPut = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
+ Put put = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
StringBuilder info = new StringBuilder("Updating hbase:meta row ");
info.append(hri.getRegionNameAsString()).append(" with state=").append(state);
if (serverName != null && !serverName.equals(oldServer)) {
- metaPut.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
+ put.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
Bytes.toBytes(serverName.getServerName()));
info.append(", sn=").append(serverName);
}
if (openSeqNum >= 0) {
Preconditions.checkArgument(state == State.OPEN
&& serverName != null, "Open region should be on a server");
- MetaTableAccessor.addLocation(metaPut, serverName, openSeqNum, -1, replicaId);
+ MetaTableAccessor.addLocation(put, serverName, openSeqNum, -1, replicaId);
info.append(", openSeqNum=").append(openSeqNum);
info.append(", server=").append(serverName);
}
- metaPut.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
+ put.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
Bytes.toBytes(state.name()));
LOG.info(info);
- HTableDescriptor descriptor = server.getTableDescriptors().get(hri.getTable());
- boolean serial = false;
- if (descriptor != null) {
- serial = server.getTableDescriptors().get(hri.getTable()).hasSerialReplicationScope();
- }
- boolean shouldPutBarrier = serial && state == State.OPEN;
+
// Persist the state change to meta
if (metaRegion != null) {
try {
// Assume meta is pinned to master.
// At least, that's what we want.
- metaRegion.put(metaPut);
- if (shouldPutBarrier) {
- Put barrierPut = MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
- openSeqNum, hri.getTable().getName());
- metaRegion.put(barrierPut);
- }
+ metaRegion.put(put);
return; // Done here
} catch (Throwable t) {
// In unit tests, meta could be moved away by intention
@@ -253,10 +241,7 @@ public class RegionStateStore {
}
}
// Called when meta is not on master
- List<Put> list = shouldPutBarrier ?
- Arrays.asList(metaPut, MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
- openSeqNum, hri.getTable().getName())) : Arrays.asList(metaPut);
- multiHConnection.processBatchCallback(list, TableName.META_TABLE_NAME, null, null);
+ multiHConnection.processBatchCallback(Arrays.asList(put), TableName.META_TABLE_NAME, null, null);
} catch (IOException ioe) {
LOG.error("Failed to persist region state " + newState, ioe);
@@ -266,14 +251,12 @@ public class RegionStateStore {
void splitRegion(HRegionInfo p,
HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
- MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication,
- server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
+ MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication);
}
void mergeRegions(HRegionInfo p,
HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication,
- EnvironmentEdgeManager.currentTime(),
- server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
+ EnvironmentEdgeManager.currentTime());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
deleted file mode 100644
index 41864b9..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.cleaner;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.ScheduledChore;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * This chore is to clean up the useless data in hbase:meta which is used by serial replication.
- */
-@InterfaceAudience.Private
-public class ReplicationMetaCleaner extends ScheduledChore {
-
- private static final Log LOG = LogFactory.getLog(ReplicationMetaCleaner.class);
-
- private ReplicationAdmin replicationAdmin;
- private MasterServices master;
-
- public ReplicationMetaCleaner(MasterServices master, Stoppable stoppable, int period)
- throws IOException {
- super("ReplicationMetaCleaner", stoppable, period);
- this.master = master;
- replicationAdmin = new ReplicationAdmin(master.getConfiguration());
- }
-
- @Override
- protected void chore() {
- try {
- Map<String, HTableDescriptor> tables = master.getTableDescriptors().getAll();
- Map<String, Set<String>> serialTables = new HashMap<>();
- for (Map.Entry<String, HTableDescriptor> entry : tables.entrySet()) {
- boolean hasSerialScope = false;
- for (HColumnDescriptor column : entry.getValue().getFamilies()) {
- if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL) {
- hasSerialScope = true;
- break;
- }
- }
- if (hasSerialScope) {
- serialTables.put(entry.getValue().getTableName().getNameAsString(), new HashSet<String>());
- }
- }
- if (serialTables.isEmpty()){
- return;
- }
-
- Map<String, ReplicationPeerConfig> peers = replicationAdmin.listPeerConfigs();
- for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
- for (Map.Entry<byte[], byte[]> map : entry.getValue().getPeerData()
- .entrySet()) {
- String tableName = Bytes.toString(map.getKey());
- if (serialTables.containsKey(tableName)) {
- serialTables.get(tableName).add(entry.getKey());
- break;
- }
- }
- }
-
- Map<String, List<Long>> barrierMap = MetaTableAccessor.getAllBarriers(master.getConnection());
- for (Map.Entry<String, List<Long>> entry : barrierMap.entrySet()) {
- String encodedName = entry.getKey();
- byte[] encodedBytes = Bytes.toBytes(encodedName);
- boolean canClearRegion = false;
- Map<String, Long> posMap = MetaTableAccessor.getReplicationPositionForAllPeer(
- master.getConnection(), encodedBytes);
- if (posMap.isEmpty()) {
- continue;
- }
-
- String tableName = MetaTableAccessor.getSerialReplicationTableName(
- master.getConnection(), encodedBytes);
- Set<String> confPeers = serialTables.get(tableName);
- if (confPeers == null) {
- // This table doesn't exist or all cf's scope is not serial any more, we can clear meta.
- canClearRegion = true;
- } else {
- if (!allPeersHavePosition(confPeers, posMap)) {
- continue;
- }
-
- String daughterValue = MetaTableAccessor
- .getSerialReplicationDaughterRegion(master.getConnection(), encodedBytes);
- if (daughterValue != null) {
- //this region is merged or split
- boolean allDaughterStart = true;
- String[] daughterRegions = daughterValue.split(",");
- for (String daughter : daughterRegions) {
- byte[] region = Bytes.toBytes(daughter);
- if (!MetaTableAccessor.getReplicationBarriers(
- master.getConnection(), region).isEmpty() &&
- !allPeersHavePosition(confPeers,
- MetaTableAccessor
- .getReplicationPositionForAllPeer(master.getConnection(), region))) {
- allDaughterStart = false;
- break;
- }
- }
- if (allDaughterStart) {
- canClearRegion = true;
- }
- }
- }
- if (canClearRegion) {
- Delete delete = new Delete(encodedBytes);
- delete.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
- delete.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
- try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
- metaTable.delete(delete);
- }
- } else {
-
- // Barriers whose seq is larger than min pos of all peers, and the last barrier whose seq
- // is smaller than min pos should be kept. All other barriers can be deleted.
-
- long minPos = Long.MAX_VALUE;
- for (Map.Entry<String, Long> pos : posMap.entrySet()) {
- minPos = Math.min(minPos, pos.getValue());
- }
- List<Long> barriers = entry.getValue();
- int index = Collections.binarySearch(barriers, minPos);
- if (index < 0) {
- index = -index - 1;
- }
- Delete delete = new Delete(encodedBytes);
- for (int i = 0; i < index - 1; i++) {
- delete.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, Bytes.toBytes(barriers.get(i)));
- }
- try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
- metaTable.delete(delete);
- }
- }
-
- }
-
- } catch (IOException e) {
- LOG.error("Exception during cleaning up.", e);
- }
-
- }
-
- private boolean allPeersHavePosition(Set<String> peers, Map<String, Long> posMap)
- throws IOException {
- for(String peer:peers){
- if (!posMap.containsKey(peer)){
- return false;
- }
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
index 9e31fb0..03aa059 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
@@ -434,8 +434,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
if (metaEntries.isEmpty()) {
MetaTableAccessor.mergeRegions(server.getConnection(),
mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(),
- server.getServerName(), region_a.getTableDesc().getRegionReplication(), masterSystemTime,
- false);
+ server.getServerName(), region_a.getTableDesc().getRegionReplication(), masterSystemTime);
} else {
mergeRegionsAndPutMetaEntries(server.getConnection(),
mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(),
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index 95da92a..25a27a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
index 3576478..29ac199 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
@@ -337,7 +337,7 @@ public class SplitTransactionImpl implements SplitTransaction {
MetaTableAccessor.splitRegion(server.getConnection(),
parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
daughterRegions.getSecond().getRegionInfo(), server.getServerName(),
- parent.getTableDesc().getRegionReplication(), false);
+ parent.getTableDesc().getRegionReplication());
} else {
offlineParentInMetaAndputMetaEntries(server.getConnection(),
parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index b2b403b..d6f48b9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
@@ -296,19 +295,8 @@ public class Replication extends WALActionsListener.Base implements
if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell);
} else {
- WALProtos.RegionEventDescriptor maybeEvent = WALEdit.getRegionEventDescriptor(cell);
- if (maybeEvent != null && (maybeEvent.getEventType() ==
- WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
- // In serially replication, we use scopes when reading close marker.
- for (HColumnDescriptor cf : families) {
- if (cf.getScope() != REPLICATION_SCOPE_LOCAL) {
- scopes.put(cf.getName(), cf.getScope());
- }
- }
- }
- // Skip the flush/compaction
+ // Skip the flush/compaction/region events
continue;
-
}
} else if (hasReplication) {
byte[] family = CellUtil.cloneFamily(cell);
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 3278f0c..700dd01 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -18,9 +18,6 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
@@ -49,7 +46,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -76,6 +72,10 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+
/**
* Class that handles the source of a replication stream.
* Currently does not handle more than 1 slave
@@ -105,8 +105,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to
private String peerId;
-
- String actualPeerId;
// The manager of all sources to which we ping back our progress
private ReplicationSourceManager manager;
// Should we stop everything?
@@ -191,8 +189,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
- ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
- this.actualPeerId = replicationQueueInfo.getPeerId();
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
this.replicationEndpoint = replicationEndpoint;
@@ -523,16 +519,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// Current state of the worker thread
private WorkerState state;
ReplicationSourceWALReaderThread entryReader;
- // Use guava cache to set ttl for each key
- private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
- .expireAfterAccess(1, TimeUnit.DAYS).build(
- new CacheLoader<String, Boolean>() {
- @Override
- public Boolean load(String key) throws Exception {
- return false;
- }
- }
- );
public ReplicationSourceShipperThread(String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
@@ -568,9 +554,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
try {
WALEntryBatch entryBatch = entryReader.take();
- for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
- waitingUntilCanPush(entry);
- }
shipEdits(entryBatch);
releaseBufferQuota((int) entryBatch.getHeapSize());
if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty()
@@ -611,33 +594,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
}
- private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
- String key = entry.getKey();
- long seq = entry.getValue();
- boolean deleteKey = false;
- if (seq <= 0) {
- // There is a REGION_CLOSE marker, we can not continue skipping after this entry.
- deleteKey = true;
- seq = -seq;
- }
-
- if (!canSkipWaitingSet.getUnchecked(key)) {
- try {
- manager.waitUntilCanBePushed(Bytes.toBytes(key), seq, actualPeerId);
- } catch (IOException e) {
- LOG.error("waitUntilCanBePushed fail", e);
- stopper.stop("waitUntilCanBePushed fail");
- } catch (InterruptedException e) {
- LOG.warn("waitUntilCanBePushed interrupted", e);
- Thread.currentThread().interrupt();
- }
- canSkipWaitingSet.put(key, true);
- }
- if (deleteKey) {
- canSkipWaitingSet.invalidate(key);
- }
- }
-
private void cleanUpHFileRefs(WALEdit edit) throws IOException {
String peerId = peerClusterZnode;
if (peerId.contains("-")) {
@@ -682,8 +638,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
int sleepMultiplier = 0;
if (entries.isEmpty()) {
if (lastLoggedPosition != lastReadPosition) {
- // Save positions to meta table before zk.
- updateSerialRepPositions(entryBatch.getLastSeqIds());
updateLogPosition(lastReadPosition);
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
@@ -738,10 +692,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
for (int i = 0; i < size; i++) {
cleanUpHFileRefs(entries.get(i).getEdit());
}
-
- // Save positions to meta table before zk.
- updateSerialRepPositions(entryBatch.getLastSeqIds());
-
//Log and clean up WAL logs
updateLogPosition(lastReadPosition);
}
@@ -770,16 +720,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
}
- private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
- try {
- MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
- lastPositionsForSerialScope);
- } catch (IOException e) {
- LOG.error("updateReplicationPositions fail", e);
- stopper.stop("updateReplicationPositions fail");
- }
- }
-
private void updateLogPosition(long lastReadPosition) {
manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition,
this.replicationQueueInfo.isQueueRecovered(), false);
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index b50e840..7a2e971 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -24,7 +24,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -50,13 +49,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@@ -69,7 +65,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
/**
@@ -124,8 +119,6 @@ public class ReplicationSourceManager implements ReplicationListener {
private final Random rand;
private final boolean replicationForBulkLoadDataEnabled;
- private Connection connection;
- private long replicationWaitTime;
private AtomicLong totalBufferUsed = new AtomicLong();
@@ -144,7 +137,7 @@ public class ReplicationSourceManager implements ReplicationListener {
public ReplicationSourceManager(final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
- final Path oldLogDir, final UUID clusterId) throws IOException {
+ final Path oldLogDir, final UUID clusterId) {
//CopyOnWriteArrayList is thread-safe.
//Generally, reading is more than modifying.
this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
@@ -181,9 +174,6 @@ public class ReplicationSourceManager implements ReplicationListener {
replicationForBulkLoadDataEnabled =
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
- this.replicationWaitTime = conf.getLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY,
- HConstants.REPLICATION_SERIALLY_WAITING_DEFAULT);
- connection = ConnectionFactory.createConnection(conf);
}
/**
@@ -823,10 +813,6 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
public ReplicationPeers getReplicationPeers() {return this.replicationPeers;}
- public Connection getConnection() {
- return this.connection;
- }
-
/**
* Get a string representation of all the sources' metrics
*/
@@ -853,75 +839,4 @@ public class ReplicationSourceManager implements ReplicationListener {
public void cleanUpHFileRefs(String peerId, List<String> files) {
this.replicationQueues.removeHFileRefs(peerId, files);
}
-
- /**
- * Whether an entry can be pushed to the peer or not right now.
- * If we enable serial replication, we can not push the entry until all entries in its region
- * whose sequence numbers are smaller than this entry have been pushed.
- * For each ReplicationSource, we need only check the first entry in each region, as long as it
- * can be pushed, we can push all in this ReplicationSource.
- * This method will be blocked until we can push.
- * @return the first barrier of entry's region, or -1 if there is no barrier. It is used to
- * prevent saving positions in the region of no barrier.
- */
- void waitUntilCanBePushed(byte[] encodedName, long seq, String peerId)
- throws IOException, InterruptedException {
-
- /**
- * There are barriers for this region and position for this peer. N barriers form N intervals,
- * (b1,b2) (b2,b3) ... (bn,max). Generally, there is no logs whose seq id is not greater than
- * the first barrier and the last interval is start from the last barrier.
- *
- * There are several conditions that we can push now, otherwise we should block:
- * 1) "Serial replication" is not enabled, we can push all logs just like before. This case
- * should not call this method.
- * 2) There is no barriers for this region, or the seq id is smaller than the first barrier.
- * It is mainly because we alter REPLICATION_SCOPE = 2. We can not guarantee the
- * order of logs that is written before altering.
- * 3) This entry is in the first interval of barriers. We can push them because it is the
- * start of a region. Splitting/merging regions are also ok because the first section of
- * daughter region is in same region of parents and the order in one RS is guaranteed.
- * 4) If the entry's seq id and the position are in same section, or the pos is the last
- * number of previous section. Because when open a region we put a barrier the number
- * is the last log's id + 1.
- * 5) Log's seq is smaller than pos in meta, we are retrying. It may happen when a RS crashes
- * after save replication meta and before save zk offset.
- */
- List<Long> barriers = MetaTableAccessor.getReplicationBarriers(connection, encodedName);
- if (barriers.isEmpty() || seq <= barriers.get(0)) {
- // Case 2
- return;
- }
- int interval = Collections.binarySearch(barriers, seq);
- if (interval < 0) {
- interval = -interval - 1;// get the insert position if negative
- }
- if (interval == 1) {
- // Case 3
- return;
- }
-
- while (true) {
- long pos = MetaTableAccessor.getReplicationPositionForOnePeer(connection, encodedName, peerId);
- if (seq <= pos) {
- // Case 5
- }
- if (pos >= 0) {
- // Case 4
- int posInterval = Collections.binarySearch(barriers, pos);
- if (posInterval < 0) {
- posInterval = -posInterval - 1;// get the insert position if negative
- }
- if (posInterval == interval || pos == barriers.get(interval - 1) - 1) {
- return;
- }
- }
-
- LOG.info(Bytes.toString(encodedName) + " can not start pushing to peer " + peerId
- + " because previous log has not been pushed: sequence=" + seq + " pos=" + pos
- + " barriers=" + Arrays.toString(barriers.toArray()));
- Thread.sleep(replicationWaitTime);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index 40828b7..306ba8f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -141,10 +141,6 @@ public class ReplicationSourceWALReaderThread extends Thread {
batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
}
Entry entry = entryStream.next();
- if (updateSerialReplPos(batch, entry)) {
- batch.lastWalPosition = entryStream.getPosition();
- break;
- }
entry = filterEntry(entry);
if (entry != null) {
WALEdit edit = entry.getEdit();
@@ -246,33 +242,6 @@ public class ReplicationSourceWALReaderThread extends Thread {
}
/**
- * @return true if we should stop reading because we're at REGION_CLOSE
- */
- private boolean updateSerialReplPos(WALEntryBatch batch, Entry entry) throws IOException {
- if (entry.hasSerialReplicationScope()) {
- String key = Bytes.toString(entry.getKey().getEncodedRegionName());
- batch.setLastPosition(key, entry.getKey().getSequenceId());
- if (!entry.getEdit().getCells().isEmpty()) {
- WALProtos.RegionEventDescriptor maybeEvent =
- WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
- if (maybeEvent != null && maybeEvent
- .getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
- // In serially replication, if we move a region to another RS and move it back, we may
- // read logs crossing two sections. We should break at REGION_CLOSE and push the first
- // section first in case of missing the middle section belonging to the other RS.
- // In a worker thread, if we can push the first log of a region, we can push all logs
- // in the same region without waiting until we read a close marker because next time
- // we read logs in this region, it must be a new section and not adjacent with this
- // region. Mark it negative.
- batch.setLastPosition(key, -entry.getKey().getSequenceId());
- return true;
- }
- }
- }
- return false;
- }
-
- /**
* Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
* batch to become available
* @return A batch of entries, along with the position in the log after reading the batch
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 9e90a0c..413cbaa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -21,11 +21,8 @@ package org.apache.hadoop.hbase.wal;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Map;
import java.util.Set;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -38,7 +35,6 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.annotations.VisibleForTesting;
@@ -273,18 +269,6 @@ public interface WAL extends Closeable {
key.setCompressionContext(compressionContext);
}
- public boolean hasSerialReplicationScope () {
- if (getKey().getScopes() == null || getKey().getScopes().isEmpty()) {
- return false;
- }
- for (Map.Entry<byte[], Integer> e:getKey().getScopes().entrySet()) {
- if (e.getValue() == HConstants.REPLICATION_SCOPE_SERIAL){
- return true;
- }
- }
- return false;
- }
-
@Override
public String toString() {
return this.key + "=" + this.edit;
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
index 98fff27..cb2494b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
@@ -443,7 +443,7 @@ public class TestMetaTableAccessor {
List<HRegionInfo> regionInfos = Lists.newArrayList(parent);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
- MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false);
+ MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 1);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 2);
@@ -472,7 +472,7 @@ public class TestMetaTableAccessor {
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3,
- HConstants.LATEST_TIMESTAMP, false);
+ HConstants.LATEST_TIMESTAMP);
assertEmptyMetaLocation(meta, merged.getRegionName(), 1);
assertEmptyMetaLocation(meta, merged.getRegionName(), 2);
@@ -556,7 +556,7 @@ public class TestMetaTableAccessor {
// now merge the regions, effectively deleting the rows for region a and b.
MetaTableAccessor.mergeRegions(connection, mergedRegionInfo,
- regionInfoA, regionInfoB, sn, 1, masterSystemTime, false);
+ regionInfoA, regionInfoB, sn, 1, masterSystemTime);
result = meta.get(get);
serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
@@ -639,7 +639,7 @@ public class TestMetaTableAccessor {
}
SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler();
long prevCalls = scheduler.numPriorityCalls;
- MetaTableAccessor.splitRegion(connection, parent, splitA, splitB,loc.getServerName(),1,false);
+ MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1);
assertTrue(prevCalls < scheduler.numPriorityCalls);
}
@@ -661,7 +661,7 @@ public class TestMetaTableAccessor {
List<HRegionInfo> regionInfos = Lists.newArrayList(parent);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
- MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false);
+ MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
Get get1 = new Get(splitA.getRegionName());
Result resultA = meta.get(get1);
Cell serverCellA = resultA.getColumnLatestCell(HConstants.CATALOG_FAMILY,
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java
index bead7e9..bff9c78 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java
@@ -165,7 +165,7 @@ public class TestMetaScanner {
end);
MetaTableAccessor.splitRegion(connection,
- parent, splita, splitb, ServerName.valueOf("fooserver", 1, 0), 1, false);
+ parent, splita, splitb, ServerName.valueOf("fooserver", 1, 0), 1);
Threads.sleep(random.nextInt(200));
} catch (Throwable e) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/93c91666/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
index 78b23c0..69dfa40 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
@@ -1317,7 +1317,7 @@ public class TestAssignmentManagerOnCluster {
}
conf.setInt("hbase.regionstatestore.meta.connection", 3);
final RegionStateStore rss =
- new RegionStateStore(new MyMaster(conf, new ZkCoordinatedStateManager()));
+ new RegionStateStore(new MyRegionServer(conf, new ZkCoordinatedStateManager()));
rss.start();
// Create 10 threads and make each do 10 puts related to region state update
Thread[] th = new Thread[10];