You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/02/23 06:05:35 UTC
[2/2] hbase git commit: HBASE-20048 Revert serial replication feature
HBASE-20048 Revert serial replication feature
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ad5cd50d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ad5cd50d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ad5cd50d
Branch: refs/heads/master
Commit: ad5cd50dfcc34d997a13e6e7fef9abd160893950
Parents: 1bc996a
Author: zhangduo <zh...@apache.org>
Authored: Fri Feb 23 08:51:37 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Feb 23 13:58:31 2018 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/HTableDescriptor.java | 8 -
.../apache/hadoop/hbase/MetaTableAccessor.java | 296 ++------------
.../hadoop/hbase/client/TableDescriptor.java | 9 +-
.../hbase/client/TableDescriptorBuilder.java | 17 +-
.../client/replication/ReplicationAdmin.java | 10 +-
.../client/TestTableDescriptorBuilder.java | 25 +-
.../org/apache/hadoop/hbase/HConstants.java | 33 --
.../src/main/resources/hbase-default.xml | 13 -
hbase-protocol/src/main/protobuf/WAL.proto | 1 -
.../org/apache/hadoop/hbase/master/HMaster.java | 6 -
.../master/assignment/RegionStateStore.java | 23 +-
.../master/cleaner/ReplicationMetaCleaner.java | 191 ---------
.../hbase/regionserver/HRegionServer.java | 77 ++--
.../hbase/regionserver/wal/FSWALEntry.java | 1 +
.../RecoveredReplicationSourceShipper.java | 3 +-
.../regionserver/ReplicationSourceManager.java | 113 ------
.../regionserver/ReplicationSourceShipper.java | 76 +---
.../ReplicationSourceWALReader.java | 52 +--
.../hbase/snapshot/RestoreSnapshotHelper.java | 2 +-
.../hadoop/hbase/util/FSTableDescriptors.java | 30 --
.../java/org/apache/hadoop/hbase/wal/WAL.java | 19 +-
.../hadoop/hbase/TestMetaTableAccessor.java | 10 +-
.../regionserver/TestRegionServerMetrics.java | 2 +-
.../replication/TestSerialReplication.java | 400 -------------------
.../regionserver/TestGlobalThrottler.java | 2 +-
src/main/asciidoc/_chapters/ops_mgt.adoc | 41 +-
26 files changed, 84 insertions(+), 1376 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index c9807c3..e512b2c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -538,14 +538,6 @@ public class HTableDescriptor implements TableDescriptor, Comparable<HTableDescr
}
/**
- * Return true if there are at least one cf whose replication scope is serial.
- */
- @Override
- public boolean hasSerialReplicationScope() {
- return delegatee.hasSerialReplicationScope();
- }
-
- /**
* Returns the configured replicas per region
*/
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index dad9aef..7d00f92 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -17,13 +17,14 @@
*/
package org.apache.hadoop.hbase;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -33,7 +34,6 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.client.Connection;
@@ -71,9 +71,8 @@ import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import edu.umd.cs.findbugs.annotations.Nullable;
/**
* Read/write operations on region and assignment information store in
@@ -123,34 +122,14 @@ public class MetaTableAccessor {
* region is the result of a merge
* info:mergeB => contains a serialized HRI for the second parent region if the
* region is the result of a merge
+ *
* The actual layout of meta should be encapsulated inside MetaTableAccessor methods,
* and should not leak out of it (through Result objects, etc)
- *
- * For replication serially, there are three column families "rep_barrier", "rep_position" and
- * "rep_meta" whose row key is encodedRegionName.
- * rep_barrier:{seqid} => in each time a RS opens a region, it saves the open sequence
- * id in this region
- * rep_position:{peerid} => to save the max sequence id we have pushed for each peer
- * rep_meta:_TABLENAME_ => a special cell to save this region's table name, will used when
- * we clean old data
- * rep_meta:_DAUGHTER_ => a special cell to present this region is split or merged, in this
- * cell the value is merged encoded name or two split encoded names
- * separated by ","
- * rep_meta:_PARENT_ => a special cell to present this region's parent region(s), in this
- * cell the value is encoded name of one or two parent regions
- * separated by ","
*/
private static final Logger LOG = LoggerFactory.getLogger(MetaTableAccessor.class);
private static final Logger METALOG = LoggerFactory.getLogger("org.apache.hadoop.hbase.META");
- // Save its daughter/parent region(s) when split/merge
- private static final byte[] daughterNameCq = Bytes.toBytes("_DAUGHTER_");
- private static final byte[] parentNameCq = Bytes.toBytes("_PARENT_");
-
- // Save its table name because we only know region's encoded name
- private static final byte[] tableNameCq = Bytes.toBytes("_TABLENAME_");
-
static final byte [] META_REGION_PREFIX;
static {
// Copy the prefix from FIRST_META_REGIONINFO into META_REGION_PREFIX.
@@ -1352,56 +1331,6 @@ public class MetaTableAccessor {
return delete;
}
- public static Put makeBarrierPut(byte[] encodedRegionName, long seq, byte[] tableName)
- throws IOException {
- byte[] seqBytes = Bytes.toBytes(seq);
- Put put = new Put(encodedRegionName);
- put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
- .setRow(put.getRow())
- .setFamily(HConstants.REPLICATION_BARRIER_FAMILY)
- .setQualifier(seqBytes)
- .setTimestamp(put.getTimeStamp())
- .setType(Type.Put)
- .setValue(seqBytes)
- .build())
- .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
- .setRow(put.getRow())
- .setFamily(HConstants.REPLICATION_META_FAMILY)
- .setQualifier(tableNameCq)
- .setTimestamp(put.getTimeStamp())
- .setType(Cell.Type.Put)
- .setValue(tableName)
- .build());
- return put;
- }
-
-
- public static Put makeDaughterPut(byte[] encodedRegionName, byte[] value) throws IOException {
- Put put = new Put(encodedRegionName);
- put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
- .setRow(put.getRow())
- .setFamily(HConstants.REPLICATION_META_FAMILY)
- .setQualifier(daughterNameCq)
- .setTimestamp(put.getTimeStamp())
- .setType(Type.Put)
- .setValue(value)
- .build());
- return put;
- }
-
- public static Put makeParentPut(byte[] encodedRegionName, byte[] value) throws IOException {
- Put put = new Put(encodedRegionName);
- put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
- .setRow(put.getRow())
- .setFamily(HConstants.REPLICATION_META_FAMILY)
- .setQualifier(parentNameCq)
- .setTimestamp(put.getTimeStamp())
- .setType(Type.Put)
- .setValue(value)
- .build());
- return put;
- }
-
/**
* Adds split daughters to the Put
*/
@@ -1431,26 +1360,25 @@ public class MetaTableAccessor {
}
/**
- * Put the passed <code>puts</code> to the <code>hbase:meta</code> table.
- * Non-atomic for multi puts.
+ * Put the passed <code>p</code> to the <code>hbase:meta</code> table.
* @param connection connection we're using
- * @param puts Put to add to hbase:meta
+ * @param p Put to add to hbase:meta
* @throws IOException
*/
- public static void putToMetaTable(final Connection connection, final Put... puts)
+ static void putToMetaTable(final Connection connection, final Put p)
throws IOException {
- put(getMetaHTable(connection), Arrays.asList(puts));
+ put(getMetaHTable(connection), p);
}
/**
* @param t Table to use (will be closed when done).
- * @param puts puts to make
+ * @param p put to make
* @throws IOException
*/
- private static void put(final Table t, final List<Put> puts) throws IOException {
+ private static void put(final Table t, final Put p) throws IOException {
try {
- debugLogMutations(puts);
- t.put(puts);
+ debugLogMutation(p);
+ t.put(p);
} finally {
t.close();
}
@@ -1567,7 +1495,7 @@ public class MetaTableAccessor {
* Adds daughter region infos to hbase:meta row for the specified region. Note that this does not
* add its daughter's as different rows, but adds information about the daughters in the same row
* as the parent. Use
- * {@link #splitRegion(Connection, RegionInfo, RegionInfo, RegionInfo, ServerName,int,boolean)}
+ * {@link #splitRegion(Connection, RegionInfo, RegionInfo, RegionInfo, ServerName,int)}
* if you want to do that.
* @param connection connection we're using
* @param regionInfo RegionInfo of parent region
@@ -1575,7 +1503,7 @@ public class MetaTableAccessor {
* @param splitB second split daughter of the parent regionInfo
* @throws IOException if problem connecting or updating meta
*/
- public static void addSpiltsToParent(Connection connection, RegionInfo regionInfo,
+ public static void addSplitsToParent(Connection connection, RegionInfo regionInfo,
RegionInfo splitA, RegionInfo splitB) throws IOException {
Table meta = getMetaHTable(connection);
try {
@@ -1590,7 +1518,11 @@ public class MetaTableAccessor {
}
/**
- * Adds a hbase:meta row for the specified new region. Initial state of new region is CLOSED.
+ * Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
+ * does not add its daughter's as different rows, but adds information about the daughters
+ * in the same row as the parent. Use
+ * {@link #splitRegion(Connection, RegionInfo, RegionInfo, RegionInfo, ServerName, int)}
+ * if you want to do that.
* @param connection connection we're using
* @param regionInfo region information
* @throws IOException if problem connecting or updating meta
@@ -1651,12 +1583,11 @@ public class MetaTableAccessor {
* @param regionB
* @param sn the location of the region
* @param masterSystemTime
- * @param saveBarrier true if need save replication barrier in meta, used for serial replication
* @throws IOException
*/
public static void mergeRegions(final Connection connection, RegionInfo mergedRegion,
RegionInfo regionA, RegionInfo regionB, ServerName sn, int regionReplication,
- long masterSystemTime, boolean saveBarrier)
+ long masterSystemTime)
throws IOException {
Table meta = getMetaHTable(connection);
try {
@@ -1707,20 +1638,7 @@ public class MetaTableAccessor {
byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString()
+ HConstants.DELIMITER);
- Mutation[] mutations;
- if (saveBarrier) {
- Put putBarrierA = makeDaughterPut(regionA.getEncodedNameAsBytes(),
- mergedRegion.getEncodedNameAsBytes());
- Put putBarrierB = makeDaughterPut(regionB.getEncodedNameAsBytes(),
- mergedRegion.getEncodedNameAsBytes());
- Put putDaughter = makeParentPut(mergedRegion.getEncodedNameAsBytes(), Bytes.toBytes(
- regionA.getEncodedName() + "," + regionB.getEncodedName()));
- mutations = new Mutation[] { putOfMerged, deleteA, deleteB,
- putBarrierA, putBarrierB, putDaughter};
- } else {
- mutations = new Mutation[] { putOfMerged, deleteA, deleteB };
- }
- multiMutate(connection, meta, tableRow, mutations);
+ multiMutate(connection, meta, tableRow, putOfMerged, deleteA, deleteB);
} finally {
meta.close();
}
@@ -1736,11 +1654,9 @@ public class MetaTableAccessor {
* @param splitA Split daughter region A
* @param splitB Split daughter region A
* @param sn the location of the region
- * @param saveBarrier true if need save replication barrier in meta, used for serial replication
*/
- public static void splitRegion(final Connection connection, RegionInfo parent,
- RegionInfo splitA, RegionInfo splitB, ServerName sn, int regionReplication,
- boolean saveBarrier) throws IOException {
+ public static void splitRegion(final Connection connection, RegionInfo parent, RegionInfo splitA,
+ RegionInfo splitB, ServerName sn, int regionReplication) throws IOException {
Table meta = getMetaHTable(connection);
try {
//Put for parent
@@ -1771,21 +1687,8 @@ public class MetaTableAccessor {
addEmptyLocation(putB, i);
}
- Mutation[] mutations;
- if (saveBarrier) {
- Put parentPut = makeDaughterPut(parent.getEncodedNameAsBytes(),
- Bytes.toBytes(splitA.getEncodedName() + "," + splitB.getEncodedName()));
- Put daughterPutA = makeParentPut(splitA.getEncodedNameAsBytes(),
- parent.getEncodedNameAsBytes());
- Put daughterPutB = makeParentPut(splitB.getEncodedNameAsBytes(),
- parent.getEncodedNameAsBytes());
-
- mutations = new Mutation[]{putParent, putA, putB, parentPut, daughterPutA, daughterPutB};
- } else {
- mutations = new Mutation[]{putParent, putA, putB};
- }
byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER);
- multiMutate(connection, meta, tableRow, mutations);
+ multiMutate(connection, meta, tableRow, putParent, putA, putB);
} finally {
meta.close();
}
@@ -1920,32 +1823,6 @@ public class MetaTableAccessor {
}
/**
- * Updates the progress of pushing entries to peer cluster. Skip entry if value is -1.
- * @param connection connection we're using
- * @param peerId the peerId to push
- * @param positions map that saving positions for each region
- * @throws IOException
- */
- public static void updateReplicationPositions(Connection connection, String peerId,
- Map<String, Long> positions) throws IOException {
- List<Put> puts = new ArrayList<>(positions.entrySet().size());
- for (Map.Entry<String, Long> entry : positions.entrySet()) {
- Put put = new Put(Bytes.toBytes(entry.getKey()));
- put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
- .setRow(put.getRow())
- .setFamily(HConstants.REPLICATION_POSITION_FAMILY)
- .setQualifier(Bytes.toBytes(peerId))
- .setTimestamp(put.getTimeStamp())
- .setType(Cell.Type.Put)
- .setValue(Bytes.toBytes(Math.abs(entry.getValue())))
- .build());
- puts.add(put);
- }
- getMetaHTable(connection).put(puts);
- }
-
-
- /**
* Updates the location of the specified region to be the specified server.
* <p>
* Connects to the specified server which should be hosting the specified
@@ -2163,129 +2040,4 @@ public class MetaTableAccessor {
.setValue(Bytes.toBytes(openSeqNum))
.build());
}
-
- /**
- * Get replication position for a peer in a region.
- * @param connection connection we're using
- * @return the position of this peer, -1 if no position in meta.
- */
- public static long getReplicationPositionForOnePeer(Connection connection,
- byte[] encodedRegionName, String peerId) throws IOException {
- Get get = new Get(encodedRegionName);
- get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId));
- Result r = get(getMetaHTable(connection), get);
- if (r.isEmpty()) {
- return -1;
- }
- Cell cell = r.rawCells()[0];
- return Bytes.toLong(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
- }
-
- /**
- * Get replication positions for all peers in a region.
- * @param connection connection we're using
- * @param encodedRegionName region's encoded name
- * @return the map of positions for each peer
- */
- public static Map<String, Long> getReplicationPositionForAllPeer(Connection connection,
- byte[] encodedRegionName) throws IOException {
- Get get = new Get(encodedRegionName);
- get.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
- Result r = get(getMetaHTable(connection), get);
- Map<String, Long> map = new HashMap<>((int) (r.size() / 0.75 + 1));
- for (Cell c : r.listCells()) {
- map.put(
- Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()),
- Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
- }
- return map;
- }
-
- /**
- * Get replication barriers for all peers in a region.
- * @param encodedRegionName region's encoded name
- * @return a list of barrier sequence numbers.
- * @throws IOException
- */
- public static List<Long> getReplicationBarriers(Connection connection, byte[] encodedRegionName)
- throws IOException {
- Get get = new Get(encodedRegionName);
- get.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
- Result r = get(getMetaHTable(connection), get);
- List<Long> list = new ArrayList<>();
- if (!r.isEmpty()) {
- for (Cell cell : r.rawCells()) {
- list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
- cell.getQualifierLength()));
- }
- }
- return list;
- }
-
- /**
- * Get all barriers in all regions.
- * @return a map of barrier lists in all regions
- * @throws IOException
- */
- public static Map<String, List<Long>> getAllBarriers(Connection connection) throws IOException {
- Map<String, List<Long>> map = new HashMap<>();
- Scan scan = new Scan();
- scan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
- try (Table t = getMetaHTable(connection);
- ResultScanner scanner = t.getScanner(scan)) {
- Result result;
- while ((result = scanner.next()) != null) {
- String key = Bytes.toString(result.getRow());
- List<Long> list = new ArrayList<>(result.rawCells().length);
- for (Cell cell : result.rawCells()) {
- list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
- cell.getQualifierLength()));
- }
- map.put(key, list);
- }
- }
- return map;
- }
-
- private static String getSerialReplicationColumnValue(Connection connection,
- byte[] encodedRegionName, byte[] columnQualifier) throws IOException {
- Get get = new Get(encodedRegionName);
- get.addColumn(HConstants.REPLICATION_META_FAMILY, columnQualifier);
- Result result = get(getMetaHTable(connection), get);
- if (!result.isEmpty()) {
- Cell c = result.rawCells()[0];
- return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength());
- }
- return null;
- }
-
- /**
- * Get daughter region(s) for a region, only used in serial replication.
- * @param connection connection we're using
- * @param encodedName region's encoded name
- */
- public static String getSerialReplicationDaughterRegion(Connection connection, byte[] encodedName)
- throws IOException {
- return getSerialReplicationColumnValue(connection, encodedName, daughterNameCq);
- }
-
- /**
- * Get parent region(s) for a region, only used in serial replication.
- * @param connection connection we're using
- * @param encodedName region's encoded name
- */
- public static String getSerialReplicationParentRegion(Connection connection, byte[] encodedName)
- throws IOException {
- return getSerialReplicationColumnValue(connection, encodedName, parentNameCq);
- }
-
- /**
- * Get the table name for a region, only used in serial replication.
- * @param connection connection we're using
- * @param encodedName region's encoded name
- */
- public static String getSerialReplicationTableName(Connection connection, byte[] encodedName)
- throws IOException {
- return getSerialReplicationColumnValue(connection, encodedName, tableNameCq);
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
index f485c4e..305b352 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
@@ -232,12 +232,6 @@ public interface TableDescriptor {
boolean hasRegionMemStoreReplication();
/**
- * @return true if there are at least one cf whose replication scope is
- * serial.
- */
- boolean hasSerialReplicationScope();
-
- /**
* Check if the compaction enable flag of the table is true. If flag is false
* then no minor/major compactions will be done in real.
*
@@ -285,8 +279,7 @@ public interface TableDescriptor {
boolean hasDisabled = false;
for (ColumnFamilyDescriptor cf : getColumnFamilies()) {
- if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
- && cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
+ if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
hasDisabled = true;
} else {
hasEnabled = true;
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
index 9f40ae6..c1db64b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
@@ -32,21 +32,20 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.regex.Matcher;
-import java.util.stream.Stream;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
/**
* @since 2.0.0
*/
@@ -1055,16 +1054,6 @@ public class TableDescriptorBuilder {
}
/**
- * Return true if there are at least one cf whose replication scope is
- * serial.
- */
- @Override
- public boolean hasSerialReplicationScope() {
- return Stream.of(getColumnFamilies())
- .anyMatch(column -> column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL);
- }
-
- /**
* Returns the configured replicas per region
*/
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index c7bf7e2..722dc2a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -78,10 +78,8 @@ public class ReplicationAdmin implements Closeable {
// only Global for now, can add other type
// such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
public static final String REPLICATIONTYPE = "replicationType";
- public static final String REPLICATIONGLOBAL =
- Integer.toString(HConstants.REPLICATION_SCOPE_GLOBAL);
- public static final String REPLICATIONSERIAL =
- Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL);
+ public static final String REPLICATIONGLOBAL = Integer
+ .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
private final Connection connection;
private Admin admin;
@@ -356,9 +354,7 @@ public class ReplicationAdmin implements Closeable {
HashMap<String, String> replicationEntry = new HashMap<>();
replicationEntry.put(TNAME, table);
replicationEntry.put(CFNAME, cf);
- replicationEntry.put(REPLICATIONTYPE,
- scope == HConstants.REPLICATION_SCOPE_GLOBAL ? REPLICATIONGLOBAL
- : REPLICATIONSERIAL);
+ replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
replicationColFams.add(replicationEntry);
});
});
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableDescriptorBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableDescriptorBuilder.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableDescriptorBuilder.java
index 7794a04..f83e13f 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableDescriptorBuilder.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableDescriptorBuilder.java
@@ -24,8 +24,9 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.regex.Pattern;
-import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -341,26 +342,4 @@ public class TestTableDescriptorBuilder {
.build();
assertEquals(42, htd.getPriority());
}
-
- @Test
- public void testSerialReplicationScope() {
- HColumnDescriptor hcdWithScope = new HColumnDescriptor(Bytes.toBytes("cf0"));
- hcdWithScope.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
- HColumnDescriptor hcdWithoutScope = new HColumnDescriptor(Bytes.toBytes("cf1"));
- TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
- .addColumnFamily(hcdWithoutScope)
- .build();
- assertFalse(htd.hasSerialReplicationScope());
-
- htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
- .addColumnFamily(hcdWithScope)
- .build();
- assertTrue(htd.hasSerialReplicationScope());
-
- htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
- .addColumnFamily(hcdWithScope)
- .addColumnFamily(hcdWithoutScope)
- .build();
- assertTrue(htd.hasSerialReplicationScope());
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 1cd6f89..891143a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -431,27 +431,6 @@ public final class HConstants {
/** The catalog family */
public static final byte [] CATALOG_FAMILY = Bytes.toBytes(CATALOG_FAMILY_STR);
- /** The replication barrier family as a string*/
- public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier";
-
- /** The replication barrier family */
- public static final byte [] REPLICATION_BARRIER_FAMILY =
- Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR);
-
- /** The replication position family as a string*/
- public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position";
-
- /** The replication position family */
- public static final byte [] REPLICATION_POSITION_FAMILY =
- Bytes.toBytes(REPLICATION_POSITION_FAMILY_STR);
-
- /** The replication meta family as a string*/
- public static final String REPLICATION_META_FAMILY_STR = "rep_meta";
-
- /** The replication meta family */
- public static final byte [] REPLICATION_META_FAMILY =
- Bytes.toBytes(REPLICATION_META_FAMILY_STR);
-
/** The RegionInfo qualifier as a string */
public static final String REGIONINFO_QUALIFIER_STR = "regioninfo";
@@ -662,12 +641,6 @@ public final class HConstants {
public static final int REPLICATION_SCOPE_GLOBAL = 1;
/**
- * Scope tag for serially scoped data
- * This data will be replicated to all peers by the order of sequence id.
- */
- public static final int REPLICATION_SCOPE_SERIAL = 2;
-
- /**
* Default cluster ID, cannot be used to identify a cluster so a key with
* this value means it wasn't meant for replication.
*/
@@ -917,12 +890,6 @@ public final class HConstants {
public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
/** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id";
-
- public static final String
- REPLICATION_SERIALLY_WAITING_KEY = "hbase.serial.replication.waitingMs";
- public static final long
- REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
-
/**
* Max total size of buffered entries in all replication peers. It will prevent server getting
* OOM if there are many peers. Default value is 256MB which is four times to default
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 26865de..61f0461 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1659,19 +1659,6 @@ possible configurations would overwhelm and obscure the important.
default of 10 will rarely need to be changed.
</description>
</property>
- <property>
- <name>hbase.serial.replication.waitingMs</name>
- <value>10000</value>
- <description>
- By default, in replication we can not make sure the order of operations in slave cluster is
- same as the order in master. If set REPLICATION_SCOPE to 2, we will push edits by the order
- of written. This configuration is to set how long (in ms) we will wait before next checking if
- a log can NOT be pushed because there are some logs written before it that have yet to be
- pushed. A larger waiting will decrease the number of queries on hbase:meta but will enlarge
- the delay of replication. This feature relies on zk-less assignment, so users must set
- hbase.assignment.usezk to false to support it.
- </description>
- </property>
<!-- Static Web User Filter properties. -->
<property>
<name>hbase.http.staticuser.user</name>
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-protocol/src/main/protobuf/WAL.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto
index 2494977..c1d465a 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -75,7 +75,6 @@ message WALKey {
enum ScopeType {
REPLICATION_SCOPE_LOCAL = 0;
REPLICATION_SCOPE_GLOBAL = 1;
- REPLICATION_SCOPE_SERIAL = 2;
}
message FamilyScope {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 818a0ed..8a54264 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -109,7 +109,6 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
-import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
@@ -125,7 +124,6 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
-import org.apache.hadoop.hbase.master.procedure.ModifyNamespaceProcedure;
import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
@@ -365,7 +363,6 @@ public class HMaster extends HRegionServer implements MasterServices {
private ClusterStatusPublisher clusterStatusPublisherChore = null;
CatalogJanitor catalogJanitorChore;
- private ReplicationMetaCleaner replicationMetaCleaner;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
@@ -1166,8 +1163,6 @@ public class HMaster extends HRegionServer implements MasterServices {
if (LOG.isTraceEnabled()) {
LOG.trace("Started service threads");
}
- replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
- getChoreService().scheduleChore(replicationMetaCleaner);
}
@Override
@@ -1190,7 +1185,6 @@ public class HMaster extends HRegionServer implements MasterServices {
// Clean up and close up shop
if (this.logCleaner != null) this.logCleaner.cancel(true);
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
- if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
if (this.quotaManager != null) this.quotaManager.stop();
if (this.activeMasterManager != null) this.activeMasterManager.stop();
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
index a5c4cf2..ab5c442 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
@@ -199,15 +199,7 @@ public class RegionStateStore {
.setValue(Bytes.toBytes(state.name()))
.build());
LOG.info(info.toString());
-
- final boolean serialReplication = hasSerialReplicationScope(regionInfo.getTable());
- if (serialReplication && state == State.OPEN) {
- Put barrierPut = MetaTableAccessor.makeBarrierPut(regionInfo.getEncodedNameAsBytes(),
- openSeqNum, regionInfo.getTable().getName());
- updateRegionLocation(regionInfo, state, put, barrierPut);
- } else {
- updateRegionLocation(regionInfo, state, put);
- }
+ updateRegionLocation(regionInfo, state, put);
}
protected void updateRegionLocation(final RegionInfo regionInfo, final State state,
@@ -238,7 +230,7 @@ public class RegionStateStore {
final RegionInfo hriB, final ServerName serverName) throws IOException {
final TableDescriptor htd = getTableDescriptor(parent.getTable());
MetaTableAccessor.splitRegion(master.getConnection(), parent, hriA, hriB, serverName,
- getRegionReplication(htd), hasSerialReplicationScope(htd));
+ getRegionReplication(htd));
}
// ============================================================================================
@@ -248,8 +240,7 @@ public class RegionStateStore {
final RegionInfo hriB, final ServerName serverName) throws IOException {
final TableDescriptor htd = getTableDescriptor(parent.getTable());
MetaTableAccessor.mergeRegions(master.getConnection(), parent, hriA, hriB, serverName,
- getRegionReplication(htd), EnvironmentEdgeManager.currentTime(),
- hasSerialReplicationScope(htd));
+ getRegionReplication(htd), EnvironmentEdgeManager.currentTime());
}
// ============================================================================================
@@ -266,14 +257,6 @@ public class RegionStateStore {
// ==========================================================================
// Table Descriptors helpers
// ==========================================================================
- private boolean hasSerialReplicationScope(final TableName tableName) throws IOException {
- return hasSerialReplicationScope(getTableDescriptor(tableName));
- }
-
- private boolean hasSerialReplicationScope(final TableDescriptor htd) {
- return (htd != null)? htd.hasSerialReplicationScope(): false;
- }
-
private int getRegionReplication(final TableDescriptor htd) {
return (htd != null) ? htd.getRegionReplication() : 1;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
deleted file mode 100644
index 43a99bd..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.cleaner;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.ScheduledChore;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * This chore is to clean up the useless data in hbase:meta which is used by serial replication.
- */
-@InterfaceAudience.Private
-public class ReplicationMetaCleaner extends ScheduledChore {
-
- private static final Logger LOG = LoggerFactory.getLogger(ReplicationMetaCleaner.class);
-
- private final Admin admin;
- private final MasterServices master;
-
- public ReplicationMetaCleaner(MasterServices master, Stoppable stoppable, int period)
- throws IOException {
- super("ReplicationMetaCleaner", stoppable, period);
- this.master = master;
- admin = master.getConnection().getAdmin();
- }
-
- @Override
- protected void chore() {
- try {
- Map<String, TableDescriptor> tables = master.getTableDescriptors().getAllDescriptors();
- Map<String, Set<String>> serialTables = new HashMap<>();
- for (Map.Entry<String, TableDescriptor> entry : tables.entrySet()) {
- boolean hasSerialScope = false;
- for (ColumnFamilyDescriptor column : entry.getValue().getColumnFamilies()) {
- if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL) {
- hasSerialScope = true;
- break;
- }
- }
- if (hasSerialScope) {
- serialTables.put(entry.getValue().getTableName().getNameAsString(), new HashSet<>());
- }
- }
- if (serialTables.isEmpty()){
- return;
- }
-
- List<ReplicationPeerDescription> peers = admin.listReplicationPeers();
- for (ReplicationPeerDescription peerDesc : peers) {
- Map<TableName, List<String>> tableCFsMap = peerDesc.getPeerConfig().getTableCFsMap();
- if (tableCFsMap ==null) {
- continue;
- }
-
- for (Map.Entry<TableName, List<String>> map : tableCFsMap.entrySet()) {
- if (serialTables.containsKey(map.getKey().getNameAsString())) {
- serialTables.get(map.getKey().getNameAsString()).add(peerDesc.getPeerId());
- break;
- }
- }
- }
-
- Map<String, List<Long>> barrierMap = MetaTableAccessor.getAllBarriers(master.getConnection());
- for (Map.Entry<String, List<Long>> entry : barrierMap.entrySet()) {
- String encodedName = entry.getKey();
- byte[] encodedBytes = Bytes.toBytes(encodedName);
- boolean canClearRegion = false;
- Map<String, Long> posMap = MetaTableAccessor.getReplicationPositionForAllPeer(
- master.getConnection(), encodedBytes);
- if (posMap.isEmpty()) {
- continue;
- }
-
- String tableName = MetaTableAccessor.getSerialReplicationTableName(
- master.getConnection(), encodedBytes);
- Set<String> confPeers = serialTables.get(tableName);
- if (confPeers == null) {
- // This table doesn't exist or all cf's scope is not serial any more, we can clear meta.
- canClearRegion = true;
- } else {
- if (!allPeersHavePosition(confPeers, posMap)) {
- continue;
- }
-
- String daughterValue = MetaTableAccessor
- .getSerialReplicationDaughterRegion(master.getConnection(), encodedBytes);
- if (daughterValue != null) {
- //this region is merged or split
- boolean allDaughterStart = true;
- String[] daughterRegions = daughterValue.split(",");
- for (String daughter : daughterRegions) {
- byte[] region = Bytes.toBytes(daughter);
- if (!MetaTableAccessor.getReplicationBarriers(
- master.getConnection(), region).isEmpty() &&
- !allPeersHavePosition(confPeers,
- MetaTableAccessor
- .getReplicationPositionForAllPeer(master.getConnection(), region))) {
- allDaughterStart = false;
- break;
- }
- }
- if (allDaughterStart) {
- canClearRegion = true;
- }
- }
- }
- if (canClearRegion) {
- Delete delete = new Delete(encodedBytes);
- delete.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
- delete.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
- delete.addFamily(HConstants.REPLICATION_META_FAMILY);
- try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
- metaTable.delete(delete);
- }
- } else {
-
- // Barriers whose seq is larger than min pos of all peers, and the last barrier whose seq
- // is smaller than min pos should be kept. All other barriers can be deleted.
-
- long minPos = Long.MAX_VALUE;
- for (Map.Entry<String, Long> pos : posMap.entrySet()) {
- minPos = Math.min(minPos, pos.getValue());
- }
- List<Long> barriers = entry.getValue();
- int index = Collections.binarySearch(barriers, minPos);
- if (index < 0) {
- index = -index - 1;
- }
- Delete delete = new Delete(encodedBytes);
- for (int i = 0; i < index - 1; i++) {
- delete.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, Bytes.toBytes(barriers.get(i)));
- }
- try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
- metaTable.delete(delete);
- }
- }
-
- }
-
- } catch (IOException e) {
- LOG.error("Exception during cleaning up.", e);
- }
-
- }
-
- private boolean allPeersHavePosition(Set<String> peers, Map<String, Long> posMap)
- throws IOException {
- for(String peer:peers){
- if (!posMap.containsKey(peer)){
- return false;
- }
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index a76dec2..b306948 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -78,7 +78,6 @@ import org.apache.hadoop.hbase.ZNodeClearer;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionUtils;
-import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
@@ -3169,54 +3168,34 @@ public class HRegionServer extends HasThread implements
* @return true if closed the region successfully.
* @throws IOException
*/
- protected boolean closeAndOfflineRegionForSplitOrMerge(
- final List<String> regionEncodedName) throws IOException {
- for (int i = 0; i < regionEncodedName.size(); ++i) {
- HRegion regionToClose = this.getRegion(regionEncodedName.get(i));
- if (regionToClose != null) {
- Map<byte[], List<HStoreFile>> hstoreFiles = null;
- Exception exceptionToThrow = null;
- try{
- hstoreFiles = regionToClose.close(false);
- } catch (Exception e) {
- exceptionToThrow = e;
- }
- if (exceptionToThrow == null && hstoreFiles == null) {
- // The region was closed by someone else
- exceptionToThrow =
- new IOException("Failed to close region: already closed by another thread");
- }
-
- if (exceptionToThrow != null) {
- if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
- throw new IOException(exceptionToThrow);
- }
- if (regionToClose.getTableDescriptor().hasSerialReplicationScope()) {
- // For serial replication, we need add a final barrier on this region. But the splitting
- // or merging may be reverted, so we should make sure if we reopen this region, the open
- // barrier is same as this final barrier
- long seq = regionToClose.getMaxFlushedSeqId();
- if (seq == HConstants.NO_SEQNUM) {
- // No edits in WAL for this region; get the sequence number when the region was opened.
- seq = regionToClose.getOpenSeqNum();
- if (seq == HConstants.NO_SEQNUM) {
- // This region has no data
- seq = 0;
- }
- } else {
- seq++;
- }
- Put finalBarrier = MetaTableAccessor.makeBarrierPut(
- Bytes.toBytes(regionEncodedName.get(i)),
- seq,
- regionToClose.getTableDescriptor().getTableName().getName());
- MetaTableAccessor.putToMetaTable(getConnection(), finalBarrier);
- }
- // Offline the region
- this.removeRegion(regionToClose, null);
- }
- }
- return true;
+ protected boolean closeAndOfflineRegionForSplitOrMerge(final List<String> regionEncodedName)
+ throws IOException {
+ for (int i = 0; i < regionEncodedName.size(); ++i) {
+ HRegion regionToClose = this.getRegion(regionEncodedName.get(i));
+ if (regionToClose != null) {
+ Map<byte[], List<HStoreFile>> hstoreFiles = null;
+ Exception exceptionToThrow = null;
+ try {
+ hstoreFiles = regionToClose.close(false);
+ } catch (Exception e) {
+ exceptionToThrow = e;
+ }
+ if (exceptionToThrow == null && hstoreFiles == null) {
+ // The region was closed by someone else
+ exceptionToThrow =
+ new IOException("Failed to close region: already closed by another thread");
+ }
+ if (exceptionToThrow != null) {
+ if (exceptionToThrow instanceof IOException) {
+ throw (IOException) exceptionToThrow;
+ }
+ throw new IOException(exceptionToThrow);
+ }
+ // Offline the region
+ this.removeRegion(regionToClose, null);
+ }
+ }
+ return true;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 1ffe7f6..de6a14c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -117,6 +117,7 @@ class FSWALEntry extends Entry {
PrivateCellUtil.setSequenceId(c, regionSequenceId);
}
}
+
getKey().setWriteEntry(we);
return regionSequenceId;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index 1e45496..38bbb48 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -74,8 +74,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
try {
WALEntryBatch entryBatch = entryReader.take();
shipEdits(entryBatch);
- if (entryBatch.getWalEntries().isEmpty()
- && entryBatch.getLastSeqIds().isEmpty()) {
+ if (entryBatch.getWalEntries().isEmpty()) {
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ source.getQueueId());
source.getSourceMetrics().incrCompletedRecoveryQueue();
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 85b2e85..4e1b20d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -44,12 +43,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
@@ -58,7 +54,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
@@ -152,8 +147,6 @@ public class ReplicationSourceManager implements ReplicationListener {
private final boolean replicationForBulkLoadDataEnabled;
- private Connection connection;
- private long replicationWaitTime;
private AtomicLong totalBufferUsed = new AtomicLong();
@@ -206,9 +199,6 @@ public class ReplicationSourceManager implements ReplicationListener {
this.latestPaths = new HashSet<Path>();
replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
- this.replicationWaitTime = conf.getLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY,
- HConstants.REPLICATION_SERIALLY_WAITING_DEFAULT);
- connection = ConnectionFactory.createConnection(conf);
}
/**
@@ -845,10 +835,6 @@ public class ReplicationSourceManager implements ReplicationListener {
return this.fs;
}
- public Connection getConnection() {
- return this.connection;
- }
-
/**
* Get the ReplicationPeers used by this ReplicationSourceManager
* @return the ReplicationPeers used by this ReplicationSourceManager
@@ -887,103 +873,4 @@ public class ReplicationSourceManager implements ReplicationListener {
int activeFailoverTaskCount() {
return executor.getActiveCount();
}
-
- /**
- * Whether an entry can be pushed to the peer or not right now. If we enable serial replication,
- * we can not push the entry until all entries in its region whose sequence numbers are smaller
- * than this entry have been pushed. For each ReplicationSource, we need only check the first
- * entry in each region, as long as it can be pushed, we can push all in this ReplicationSource.
- * This method will be blocked until we can push.
- * @return the first barrier of entry's region, or -1 if there is no barrier. It is used to
- * prevent saving positions in the region of no barrier.
- */
- void waitUntilCanBePushed(byte[] encodedName, long seq, String peerId)
- throws IOException, InterruptedException {
- /**
- * There are barriers for this region and position for this peer. N barriers form N intervals,
- * (b1,b2) (b2,b3) ... (bn,max). Generally, there is no logs whose seq id is not greater than
- * the first barrier and the last interval is start from the last barrier. There are several
- * conditions that we can push now, otherwise we should block: 1) "Serial replication" is not
- * enabled, we can push all logs just like before. This case should not call this method. 2)
- * There is no barriers for this region, or the seq id is smaller than the first barrier. It is
- * mainly because we alter REPLICATION_SCOPE = 2. We can not guarantee the order of logs that is
- * written before altering. 3) This entry is in the first interval of barriers. We can push them
- * because it is the start of a region. But if the region is created by region split, we should
- * check if the parent regions are fully pushed. 4) If the entry's seq id and the position are
- * in same section, or the pos is the last number of previous section. Because when open a
- * region we put a barrier the number is the last log's id + 1. 5) Log's seq is smaller than pos
- * in meta, we are retrying. It may happen when a RS crashes after save replication meta and
- * before save zk offset.
- */
- List<Long> barriers = MetaTableAccessor.getReplicationBarriers(connection, encodedName);
- if (barriers.isEmpty() || seq <= barriers.get(0)) {
- // Case 2
- return;
- }
- int interval = Collections.binarySearch(barriers, seq);
- if (interval < 0) {
- interval = -interval - 1;// get the insert position if negative
- }
- if (interval == 1) {
- // Case 3
- // Check if there are parent regions
- String parentValue =
- MetaTableAccessor.getSerialReplicationParentRegion(connection, encodedName);
- if (parentValue == null) {
- // This region has no parent or the parent's log entries are fully pushed.
- return;
- }
- while (true) {
- boolean allParentDone = true;
- String[] parentRegions = parentValue.split(",");
- for (String parent : parentRegions) {
- byte[] region = Bytes.toBytes(parent);
- long pos = MetaTableAccessor.getReplicationPositionForOnePeer(connection, region, peerId);
- List<Long> parentBarriers = MetaTableAccessor.getReplicationBarriers(connection, region);
- if (parentBarriers.size() > 0 &&
- parentBarriers.get(parentBarriers.size() - 1) - 1 > pos) {
- allParentDone = false;
- // For a closed region, we will write a close event marker to WAL whose sequence id is
- // larger than final barrier but still smaller than next region's openSeqNum.
- // So if the pos is larger than last barrier, we can say we have read the event marker
- // which means the parent region has been fully pushed.
- LOG.info(
- Bytes.toString(encodedName) + " can not start pushing because parent region's" +
- " log has not been fully pushed: parent=" + Bytes.toString(region) + " pos=" + pos +
- " barriers=" + Arrays.toString(barriers.toArray()));
- break;
- }
- }
- if (allParentDone) {
- return;
- } else {
- Thread.sleep(replicationWaitTime);
- }
- }
-
- }
-
- while (true) {
- long pos =
- MetaTableAccessor.getReplicationPositionForOnePeer(connection, encodedName, peerId);
- if (seq <= pos) {
- // Case 5
- }
- if (pos >= 0) {
- // Case 4
- int posInterval = Collections.binarySearch(barriers, pos);
- if (posInterval < 0) {
- posInterval = -posInterval - 1;// get the insert position if negative
- }
- if (posInterval == interval || pos == barriers.get(interval - 1) - 1) {
- return;
- }
- }
-
- LOG.info(Bytes.toString(encodedName) + " can not start pushing to peer " + peerId +
- " because previous log has not been pushed: sequence=" + seq + " pos=" + pos +
- " barriers=" + Arrays.toString(barriers.toArray()));
- Thread.sleep(replicationWaitTime);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 808f738..959f676 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -20,31 +20,23 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
-import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
-import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
/**
* This thread reads entries from a queue and ships them. Entries are placed onto the queue by
@@ -79,17 +71,6 @@ public class ReplicationSourceShipper extends Thread {
// Maximum number of retries before taking bold actions
protected final int maxRetriesMultiplier;
- // Use guava cache to set ttl for each key
- private final LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
- .expireAfterAccess(1, TimeUnit.DAYS).build(
- new CacheLoader<String, Boolean>() {
- @Override
- public Boolean load(String key) throws Exception {
- return false;
- }
- }
- );
-
public ReplicationSourceShipper(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
this.conf = conf;
@@ -125,9 +106,6 @@ public class ReplicationSourceShipper extends Thread {
try {
WALEntryBatch entryBatch = entryReader.take();
- for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
- waitingUntilCanPush(entry);
- }
shipEdits(entryBatch);
} catch (InterruptedException e) {
LOG.trace("Interrupted while waiting for next replication entry batch", e);
@@ -150,8 +128,6 @@ public class ReplicationSourceShipper extends Thread {
int sleepMultiplier = 0;
if (entries.isEmpty()) {
if (lastLoggedPosition != lastReadPosition) {
- // Save positions to meta table before zk.
- updateSerialRepPositions(entryBatch.getLastSeqIds());
updateLogPosition(lastReadPosition);
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
@@ -197,9 +173,6 @@ public class ReplicationSourceShipper extends Thread {
for (int i = 0; i < size; i++) {
cleanUpHFileRefs(entries.get(i).getEdit());
}
-
- // Save positions to meta table before zk.
- updateSerialRepPositions(entryBatch.getLastSeqIds());
//Log and clean up WAL logs
updateLogPosition(lastReadPosition);
}
@@ -225,33 +198,6 @@ public class ReplicationSourceShipper extends Thread {
}
}
- private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
- String key = entry.getKey();
- long seq = entry.getValue();
- boolean deleteKey = false;
- if (seq <= 0) {
- // There is a REGION_CLOSE marker, we can not continue skipping after this entry.
- deleteKey = true;
- seq = -seq;
- }
-
- if (!canSkipWaitingSet.getUnchecked(key)) {
- try {
- source.getSourceManager().waitUntilCanBePushed(Bytes.toBytes(key), seq, source.getPeerId());
- } catch (IOException e) {
- LOG.error("waitUntilCanBePushed fail", e);
- throw new RuntimeException("waitUntilCanBePushed fail");
- } catch (InterruptedException e) {
- LOG.warn("waitUntilCanBePushed interrupted", e);
- Thread.currentThread().interrupt();
- }
- canSkipWaitingSet.put(key, true);
- }
- if (deleteKey) {
- canSkipWaitingSet.invalidate(key);
- }
- }
-
private void cleanUpHFileRefs(WALEdit edit) throws IOException {
String peerId = source.getPeerId();
if (peerId.contains("-")) {
@@ -282,16 +228,6 @@ public class ReplicationSourceShipper extends Thread {
lastLoggedPosition = lastReadPosition;
}
- private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
- try {
- MetaTableAccessor.updateReplicationPositions(source.getSourceManager().getConnection(),
- source.getPeerId(), lastPositionsForSerialScope);
- } catch (IOException e) {
- LOG.error("updateReplicationPositions fail", e);
- throw new RuntimeException("updateReplicationPositions fail");
- }
- }
-
public void startup(UncaughtExceptionHandler handler) {
String name = Thread.currentThread().getName();
Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + ","
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index c12dcb6..579d20f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -21,14 +21,11 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -36,7 +33,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -45,7 +41,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
@@ -135,7 +131,7 @@ public class ReplicationSourceWALReader extends Thread {
continue;
}
WALEntryBatch batch = readWALEntries(entryStream);
- if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) {
+ if (batch != null && batch.getNbEntries() > 0) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Read %s WAL entries eligible for replication",
batch.getNbEntries()));
@@ -171,10 +167,6 @@ public class ReplicationSourceWALReader extends Thread {
batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
}
Entry entry = entryStream.next();
- if (updateSerialReplPos(batch, entry)) {
- batch.lastWalPosition = entryStream.getPosition();
- break;
- }
entry = filterEntry(entry);
if (entry != null) {
WALEdit edit = entry.getEdit();
@@ -247,33 +239,6 @@ public class ReplicationSourceWALReader extends Thread {
}
/**
- * @return true if we should stop reading because we're at REGION_CLOSE
- */
- private boolean updateSerialReplPos(WALEntryBatch batch, Entry entry) throws IOException {
- if (entry.hasSerialReplicationScope()) {
- String key = Bytes.toString(entry.getKey().getEncodedRegionName());
- batch.setLastPosition(key, entry.getKey().getSequenceId());
- if (!entry.getEdit().getCells().isEmpty()) {
- WALProtos.RegionEventDescriptor maybeEvent =
- WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
- if (maybeEvent != null && maybeEvent
- .getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
- // In serially replication, if we move a region to another RS and move it back, we may
- // read logs crossing two sections. We should break at REGION_CLOSE and push the first
- // section first in case of missing the middle section belonging to the other RS.
- // In a worker thread, if we can push the first log of a region, we can push all logs
- // in the same region without waiting until we read a close marker because next time
- // we read logs in this region, it must be a new section and not adjacent with this
- // region. Mark it negative.
- batch.setLastPosition(key, -entry.getKey().getSequenceId());
- return true;
- }
- }
- }
- return false;
- }
-
- /**
* Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
* batch to become available
* @return A batch of entries, along with the position in the log after reading the batch
@@ -407,8 +372,6 @@ public class ReplicationSourceWALReader extends Thread {
private int nbHFiles = 0;
// heap size of data we need to replicate
private long heapSize = 0;
- // save the last sequenceid for each region if the table has serial-replication scope
- private Map<String, Long> lastSeqIds = new HashMap<>();
/**
* @param walEntries
@@ -477,13 +440,6 @@ public class ReplicationSourceWALReader extends Thread {
return heapSize;
}
- /**
- * @return the last sequenceid for each region if the table has serial-replication scope
- */
- public Map<String, Long> getLastSeqIds() {
- return lastSeqIds;
- }
-
private void incrementNbRowKeys(int increment) {
nbRowKeys += increment;
}
@@ -495,9 +451,5 @@ public class ReplicationSourceWALReader extends Thread {
private void incrementHeapSize(long increment) {
heapSize += increment;
}
-
- private void setLastPosition(String region, Long sequenceId) {
- getLastSeqIds().put(region, sequenceId);
- }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
index c4f0e25..179dfe5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
@@ -405,7 +405,7 @@ public class RestoreSnapshotHelper {
}
LOG.debug("Update splits parent " + regionInfo.getEncodedName() + " -> " + daughters);
- MetaTableAccessor.addSpiltsToParent(connection, regionInfo,
+ MetaTableAccessor.addSplitsToParent(connection, regionInfo,
regionsByName.get(daughters.getFirst()),
regionsByName.get(daughters.getSecond()));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
index 49ed11a..c72b9e0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
@@ -159,36 +159,6 @@ public class FSTableDescriptors implements TableDescriptors {
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
.setBloomFilterType(BloomType.NONE)
.build())
- .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.REPLICATION_BARRIER_FAMILY)
- .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
- HConstants.DEFAULT_HBASE_META_VERSIONS))
- .setInMemory(true)
- .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
- HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
- .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
- // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
- .setBloomFilterType(BloomType.NONE)
- .build())
- .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.REPLICATION_POSITION_FAMILY)
- .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
- HConstants.DEFAULT_HBASE_META_VERSIONS))
- .setInMemory(true)
- .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
- HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
- .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
- // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
- .setBloomFilterType(BloomType.NONE)
- .build())
- .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.REPLICATION_META_FAMILY)
- .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
- HConstants.DEFAULT_HBASE_META_VERSIONS))
- .setInMemory(true)
- .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
- HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
- .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
- // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
- .setBloomFilterType(BloomType.NONE)
- .build())
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.TABLE_FAMILY)
.setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
HConstants.DEFAULT_HBASE_META_VERSIONS))
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index d478e4f..db6c411 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,14 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.wal;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
-
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
@@ -36,8 +33,6 @@ import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-// imports we use from yet-to-be-moved regionsever.wal
-
/**
* A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
* APIs for WAL users (such as RegionServer) to use the WAL (do append, sync, etc).
@@ -276,18 +271,6 @@ public interface WAL extends Closeable, WALFileLengthProvider {
key.setCompressionContext(compressionContext);
}
- public boolean hasSerialReplicationScope () {
- if (getKey().getReplicationScopes() == null || getKey().getReplicationScopes().isEmpty()) {
- return false;
- }
- for (Map.Entry<byte[], Integer> e:getKey().getReplicationScopes().entrySet()) {
- if (e.getValue() == HConstants.REPLICATION_SCOPE_SERIAL){
- return true;
- }
- }
- return false;
- }
-
@Override
public String toString() {
return this.key + "=" + this.edit;
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
index 3831e9c..609c54e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
@@ -496,7 +496,7 @@ public class TestMetaTableAccessor {
List<RegionInfo> regionInfos = Lists.newArrayList(parent);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
- MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false);
+ MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 1);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 2);
@@ -541,7 +541,7 @@ public class TestMetaTableAccessor {
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3,
- HConstants.LATEST_TIMESTAMP, false);
+ HConstants.LATEST_TIMESTAMP);
assertEmptyMetaLocation(meta, merged.getRegionName(), 1);
assertEmptyMetaLocation(meta, merged.getRegionName(), 2);
@@ -691,7 +691,7 @@ public class TestMetaTableAccessor {
// now merge the regions, effectively deleting the rows for region a and b.
MetaTableAccessor.mergeRegions(connection, mergedRegionInfo,
- regionInfoA, regionInfoB, sn, 1, masterSystemTime, false);
+ regionInfoA, regionInfoB, sn, 1, masterSystemTime);
result = meta.get(get);
serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
@@ -782,7 +782,7 @@ public class TestMetaTableAccessor {
}
SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler();
long prevCalls = scheduler.numPriorityCalls;
- MetaTableAccessor.splitRegion(connection, parent, splitA, splitB,loc.getServerName(),1,false);
+ MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1);
assertTrue(prevCalls < scheduler.numPriorityCalls);
}
@@ -819,7 +819,7 @@ public class TestMetaTableAccessor {
List<RegionInfo> regionInfos = Lists.newArrayList(parent);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
- MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false);
+ MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
Get get1 = new Get(splitA.getRegionName());
Result resultA = meta.get(get1);
Cell serverCellA = resultA.getColumnLatestCell(HConstants.CATALOG_FAMILY,
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad5cd50d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
index 3f01043..6af72ca 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
@@ -348,7 +348,7 @@ public class TestRegionServerMetrics {
TEST_UTIL.getAdmin().flush(tableName);
metricsRegionServer.getRegionServerWrapper().forceRecompute();
- assertGauge("storeCount", TABLES_ON_MASTER? 1: 7);
+ assertGauge("storeCount", TABLES_ON_MASTER? 1: 4);
assertGauge("storeFileCount", 1);
}