You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/08/09 07:32:47 UTC
[1/2] hbase git commit: HBASE-9465 Push entries to peer clusters
serially
Repository: hbase
Updated Branches:
refs/heads/master 1ecb0fce3 -> 5cadcd59a
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
new file mode 100644
index 0000000..d4af26d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
@@ -0,0 +1,399 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HTestConst;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSerialReplication {
+ private static final Log LOG = LogFactory.getLog(TestSerialReplication.class);
+
+ private static Configuration conf1;
+ private static Configuration conf2;
+
+ private static HBaseTestingUtility utility1;
+ private static HBaseTestingUtility utility2;
+
+ private static final byte[] famName = Bytes.toBytes("f");
+ private static final byte[] VALUE = Bytes.toBytes("v");
+ private static final byte[] ROW = Bytes.toBytes("r");
+ private static final byte[][] ROWS = HTestConst.makeNAscii(ROW, 100);
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf1 = HBaseConfiguration.create();
+ conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+ // smaller block size and capacity to trigger more operations
+ // and test them
+ conf1.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
+ conf1.setInt("replication.source.size.capacity", 1024);
+ conf1.setLong("replication.source.sleepforretries", 100);
+ conf1.setInt("hbase.regionserver.maxlogs", 10);
+ conf1.setLong("hbase.master.logcleaner.ttl", 10);
+ conf1.setBoolean("dfs.support.append", true);
+ conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+ conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+ "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
+ conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);// Each WAL is 120 bytes
+ conf1.setLong("replication.source.size.capacity", 1L);
+ conf1.setLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY, 1000L);
+
+ utility1 = new HBaseTestingUtility(conf1);
+ utility1.startMiniZKCluster();
+ MiniZooKeeperCluster miniZK = utility1.getZkCluster();
+ new ZooKeeperWatcher(conf1, "cluster1", null, true);
+
+ conf2 = new Configuration(conf1);
+ conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+
+ utility2 = new HBaseTestingUtility(conf2);
+ utility2.setZkCluster(miniZK);
+ new ZooKeeperWatcher(conf2, "cluster2", null, true);
+
+ ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
+ ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+ rpc.setClusterKey(utility2.getClusterKey());
+ admin1.addPeer("1", rpc, null);
+
+ utility1.startMiniCluster(1, 3);
+ utility2.startMiniCluster(1, 1);
+
+ utility1.getHBaseAdmin().setBalancerRunning(false, true);
+ }
+
+ @Test
+ public void testRegionMoveAndFailover() throws Exception {
+ TableName tableName = TableName.valueOf("testRSFailover");
+ HTableDescriptor table = new HTableDescriptor(tableName);
+ HColumnDescriptor fam = new HColumnDescriptor(famName);
+ fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
+ table.addFamily(fam);
+ utility1.getHBaseAdmin().createTable(table);
+ utility2.getHBaseAdmin().createTable(table);
+ try(Table t1 = utility1.getConnection().getTable(tableName);
+ Table t2 = utility2.getConnection().getTable(tableName)) {
+ LOG.info("move to 1");
+ moveRegion(t1, 1);
+ LOG.info("move to 0");
+ moveRegion(t1, 0);
+ for (int i = 10; i < 20; i++) {
+ Put put = new Put(ROWS[i]);
+ put.addColumn(famName, VALUE, VALUE);
+ t1.put(put);
+ }
+ LOG.info("move to 2");
+ moveRegion(t1, 2);
+ for (int i = 20; i < 30; i++) {
+ Put put = new Put(ROWS[i]);
+ put.addColumn(famName, VALUE, VALUE);
+ t1.put(put);
+ }
+ utility1.getHBaseCluster().abortRegionServer(2);
+ for (int i = 30; i < 40; i++) {
+ Put put = new Put(ROWS[i]);
+ put.addColumn(famName, VALUE, VALUE);
+ t1.put(put);
+ }
+
+ long start = EnvironmentEdgeManager.currentTime();
+ while (EnvironmentEdgeManager.currentTime() - start < 180000) {
+ Scan scan = new Scan();
+ scan.setCaching(100);
+ List<Cell> list = new ArrayList<>();
+ try (ResultScanner results = t2.getScanner(scan)) {
+ for (Result result : results) {
+ assertEquals(1, result.rawCells().length);
+ list.add(result.rawCells()[0]);
+ }
+ }
+ List<Integer> listOfNumbers = getRowNumbers(list);
+ LOG.info(Arrays.toString(listOfNumbers.toArray()));
+ assertIntegerList(listOfNumbers, 10, 1);
+ if (listOfNumbers.size() != 30) {
+ LOG.info("Waiting all logs pushed to slave. Expected 30 , actual " + list.size());
+ Thread.sleep(200);
+ continue;
+ }
+ return;
+ }
+ throw new Exception("Not all logs have been pushed");
+ } finally {
+ utility1.getHBaseAdmin().disableTable(tableName);
+ utility2.getHBaseAdmin().disableTable(tableName);
+ utility1.getHBaseAdmin().deleteTable(tableName);
+ utility2.getHBaseAdmin().deleteTable(tableName);
+ }
+ }
+
+ @Test
+ public void testRegionSplit() throws Exception {
+ TableName tableName = TableName.valueOf("testRegionSplit");
+ HTableDescriptor table = new HTableDescriptor(tableName);
+ HColumnDescriptor fam = new HColumnDescriptor(famName);
+ fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
+ table.addFamily(fam);
+ utility1.getHBaseAdmin().createTable(table);
+ utility2.getHBaseAdmin().createTable(table);
+ try(Table t1 = utility1.getConnection().getTable(tableName);
+ Table t2 = utility2.getConnection().getTable(tableName)) {
+
+ for (int i = 10; i < 100; i += 10) {
+ Put put = new Put(ROWS[i]);
+ put.addColumn(famName, VALUE, VALUE);
+ t1.put(put);
+ }
+ utility1.getHBaseAdmin().split(tableName, ROWS[50]);
+ Thread.sleep(5000L);
+ for (int i = 11; i < 100; i += 10) {
+ Put put = new Put(ROWS[i]);
+ put.addColumn(famName, VALUE, VALUE);
+ t1.put(put);
+ }
+ balanceTwoRegions(t1);
+ for (int i = 12; i < 100; i += 10) {
+ Put put = new Put(ROWS[i]);
+ put.addColumn(famName, VALUE, VALUE);
+ t1.put(put);
+ }
+
+ long start = EnvironmentEdgeManager.currentTime();
+ while (EnvironmentEdgeManager.currentTime() - start < 180000) {
+ Scan scan = new Scan();
+ scan.setCaching(100);
+ List<Cell> list = new ArrayList<>();
+ try (ResultScanner results = t2.getScanner(scan)) {
+ for (Result result : results) {
+ assertEquals(1, result.rawCells().length);
+ list.add(result.rawCells()[0]);
+ }
+ }
+ List<Integer> listOfNumbers = getRowNumbers(list);
+ List<Integer> list0 = new ArrayList<>();
+ List<Integer> list1 = new ArrayList<>();
+ List<Integer> list21 = new ArrayList<>();
+ List<Integer> list22 = new ArrayList<>();
+ for (int num : listOfNumbers) {
+ if (num % 10 == 0) {
+ list0.add(num);
+ } else if (num % 10 == 1) {
+ list1.add(num);
+ } else if (num < 50) { //num%10==2
+ list21.add(num);
+ } else { // num%10==1&&num>50
+ list22.add(num);
+ }
+ }
+
+ LOG.info(Arrays.toString(list0.toArray()));
+ LOG.info(Arrays.toString(list1.toArray()));
+ LOG.info(Arrays.toString(list21.toArray()));
+ LOG.info(Arrays.toString(list22.toArray()));
+ assertIntegerList(list0, 10, 10);
+ assertIntegerList(list1, 11, 10);
+ assertIntegerList(list21, 12, 10);
+ assertIntegerList(list22, 52, 10);
+ if (!list1.isEmpty()) {
+ assertEquals(9, list0.size());
+ }
+ if (!list21.isEmpty() || !list22.isEmpty()) {
+ assertEquals(9, list1.size());
+ }
+
+ if (list.size() == 27) {
+ return;
+ }
+ LOG.info("Waiting all logs pushed to slave. Expected 27 , actual " + list.size());
+ Thread.sleep(200);
+ }
+ throw new Exception("Not all logs have been pushed");
+ } finally {
+ utility1.getHBaseAdmin().disableTable(tableName);
+ utility2.getHBaseAdmin().disableTable(tableName);
+ utility1.getHBaseAdmin().deleteTable(tableName);
+ utility2.getHBaseAdmin().deleteTable(tableName);
+ }
+ }
+
+ @Test
+ public void testRegionMerge() throws Exception {
+ TableName tableName = TableName.valueOf("testRegionMerge");
+ HTableDescriptor table = new HTableDescriptor(tableName);
+ HColumnDescriptor fam = new HColumnDescriptor(famName);
+ fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
+ table.addFamily(fam);
+ utility1.getHBaseAdmin().createTable(table);
+ utility2.getHBaseAdmin().createTable(table);
+ utility1.getHBaseAdmin().split(tableName, ROWS[50]);
+ Thread.sleep(5000L);
+
+ try(Table t1 = utility1.getConnection().getTable(tableName);
+ Table t2 = utility2.getConnection().getTable(tableName)) {
+ for (int i = 10; i < 100; i += 10) {
+ Put put = new Put(ROWS[i]);
+ put.addColumn(famName, VALUE, VALUE);
+ t1.put(put);
+ }
+ List<Pair<HRegionInfo, ServerName>> regions =
+ MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), tableName);
+ assertEquals(2, regions.size());
+ utility1.getHBaseAdmin().mergeRegions(regions.get(0).getFirst().getRegionName(),
+ regions.get(1).getFirst().getRegionName(), true);
+ for (int i = 11; i < 100; i += 10) {
+ Put put = new Put(ROWS[i]);
+ put.addColumn(famName, VALUE, VALUE);
+ t1.put(put);
+ }
+
+ long start = EnvironmentEdgeManager.currentTime();
+ while (EnvironmentEdgeManager.currentTime() - start < 180000) {
+ Scan scan = new Scan();
+ scan.setCaching(100);
+ List<Cell> list = new ArrayList<>();
+ try (ResultScanner results = t2.getScanner(scan)) {
+ for (Result result : results) {
+ assertEquals(1, result.rawCells().length);
+ list.add(result.rawCells()[0]);
+ }
+ }
+ List<Integer> listOfNumbers = getRowNumbers(list);
+ List<Integer> list0 = new ArrayList<>();
+ List<Integer> list1 = new ArrayList<>();
+ for (int num : listOfNumbers) {
+ if (num % 10 == 0) {
+ list0.add(num);
+ } else {
+ list1.add(num);
+ }
+ }
+ LOG.info(Arrays.toString(list0.toArray()));
+ LOG.info(Arrays.toString(list1.toArray()));
+ assertIntegerList(list0, 10, 10);
+ assertIntegerList(list1, 11, 10);
+ if (!list1.isEmpty()) {
+ assertEquals(9, list0.size());
+ }
+ if (list.size() == 18) {
+ return;
+ }
+ LOG.info("Waiting all logs pushed to slave. Expected 18 , actual " + list.size());
+ Thread.sleep(200);
+ }
+
+ } finally {
+ utility1.getHBaseAdmin().disableTable(tableName);
+ utility2.getHBaseAdmin().disableTable(tableName);
+ utility1.getHBaseAdmin().deleteTable(tableName);
+ utility2.getHBaseAdmin().deleteTable(tableName);
+ }
+ }
+
+ private List<Integer> getRowNumbers(List<Cell> cells) {
+ List<Integer> listOfRowNumbers = new ArrayList<>();
+ for (Cell c : cells) {
+ listOfRowNumbers.add(Integer.parseInt(Bytes
+ .toString(c.getRowArray(), c.getRowOffset() + ROW.length,
+ c.getRowLength() - ROW.length)));
+ }
+ return listOfRowNumbers;
+ }
+
+ @AfterClass
+ public static void setUpAfterClass() throws Exception {
+ utility2.shutdownMiniCluster();
+ utility1.shutdownMiniCluster();
+ }
+
+ private void moveRegion(Table table, int index) throws IOException {
+ List<Pair<HRegionInfo, ServerName>> regions =
+ MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), table.getName());
+ assertEquals(1, regions.size());
+ HRegionInfo regionInfo = regions.get(0).getFirst();
+ ServerName name = utility1.getHBaseCluster().getRegionServer(index).getServerName();
+ utility1.getAdmin()
+ .move(regionInfo.getEncodedNameAsBytes(), Bytes.toBytes(name.getServerName()));
+ try {
+ Thread.sleep(5000L); // wait to complete
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void balanceTwoRegions(Table table) throws Exception {
+ List<Pair<HRegionInfo, ServerName>> regions =
+ MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), table.getName());
+ assertEquals(2, regions.size());
+ HRegionInfo regionInfo1 = regions.get(0).getFirst();
+ ServerName name1 = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+ HRegionInfo regionInfo2 = regions.get(1).getFirst();
+ ServerName name2 = utility1.getHBaseCluster().getRegionServer(1).getServerName();
+ utility1.getAdmin()
+ .move(regionInfo1.getEncodedNameAsBytes(), Bytes.toBytes(name1.getServerName()));
+ Thread.sleep(5000L);
+ utility1.getAdmin()
+ .move(regionInfo2.getEncodedNameAsBytes(), Bytes.toBytes(name2.getServerName()));
+ Thread.sleep(5000L);
+ }
+
+ private void assertIntegerList(List<Integer> list, int start, int step) {
+ int size = list.size();
+ for (int i = 0; i < size; i++) {
+ assertTrue(list.get(i) == start + step * i);
+ }
+ }
+}
[2/2] hbase git commit: HBASE-9465 Push entries to peer clusters
serially
Posted by zh...@apache.org.
HBASE-9465 Push entries to peer clusters serially
Signed-off-by: zhangduo <zh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5cadcd59
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5cadcd59
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5cadcd59
Branch: refs/heads/master
Commit: 5cadcd59aa57c9566349dc8551c958dc974e774e
Parents: 1ecb0fc
Author: Phil Yang <ud...@gmail.com>
Authored: Thu Aug 4 10:11:56 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Aug 9 15:25:50 2016 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/HTableDescriptor.java | 12 +
.../apache/hadoop/hbase/MetaTableAccessor.java | 234 ++++++++++-
.../hbase/client/ConnectionImplementation.java | 1 +
.../client/replication/ReplicationAdmin.java | 14 +-
.../org/apache/hadoop/hbase/HConstants.java | 26 ++
.../src/main/resources/hbase-default.xml | 14 +
.../hbase/protobuf/generated/WALProtos.java | 18 +-
hbase-protocol/src/main/protobuf/WAL.proto | 1 +
.../org/apache/hadoop/hbase/master/HMaster.java | 5 +
.../hadoop/hbase/master/RegionStateStore.java | 47 ++-
.../master/cleaner/ReplicationMetaCleaner.java | 186 +++++++++
.../hbase/regionserver/wal/FSWALEntry.java | 1 -
.../replication/regionserver/Replication.java | 12 +
.../regionserver/ReplicationSource.java | 127 +++++-
.../regionserver/ReplicationSourceManager.java | 87 +++-
.../hadoop/hbase/util/FSTableDescriptors.java | 24 ++
.../java/org/apache/hadoop/hbase/wal/WAL.java | 16 +
.../hadoop/hbase/TestMetaTableAccessor.java | 8 +-
.../master/TestAssignmentManagerOnCluster.java | 2 +-
.../replication/TestSerialReplication.java | 399 +++++++++++++++++++
20 files changed, 1176 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index ccad414..9abdf42 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -1115,6 +1115,18 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
}
/**
+ * Return true if there are at least one cf whose replication scope is serial.
+ */
+ public boolean hasSerialReplicationScope() {
+ for (HColumnDescriptor column: getFamilies()){
+ if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL){
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* Returns the configured replicas per region
*/
public int getRegionReplication() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index a5dbc94..1eaa753 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -17,11 +17,15 @@
*/
package org.apache.hadoop.hbase;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -34,8 +38,6 @@ import java.util.regex.Pattern;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -113,14 +115,31 @@ public class MetaTableAccessor {
* region is the result of a merge
* info:mergeB => contains a serialized HRI for the second parent region if the
* region is the result of a merge
- *
* The actual layout of meta should be encapsulated inside MetaTableAccessor methods,
* and should not leak out of it (through Result objects, etc)
+ *
+ * For replication serially, there are two column families "rep_barrier", "rep_position" whose
+ * row key is encodedRegionName.
+ * rep_barrier:{seqid} => in each time a RS opens a region, it saves the open sequence
+ * id in this region
+ * rep_position:{peerid} => to save the max sequence id we have pushed for each peer
+ * rep_position:_TABLENAME_ => a special cell to save this region's table name, will used when
+ * we clean old data
+ * rep_position:_DAUGHTER_ => a special cell to present this region is split or merged, in this
+ * cell the value is merged encoded name or two split encoded names
+ * separated by ","
*/
private static final Log LOG = LogFactory.getLog(MetaTableAccessor.class);
private static final Log METALOG = LogFactory.getLog("org.apache.hadoop.hbase.META");
+ // Save its daughter region(s) when split/merge
+ private static final byte[] daughterNamePosCq = Bytes.toBytes("_DAUGHTER_");
+
+ // Save its table name because we only know region's encoded name
+ private static final String tableNamePeer = "_TABLENAME_";
+ private static final byte[] tableNamePosCq = Bytes.toBytes(tableNamePeer);
+
static final byte [] META_REGION_PREFIX;
static {
// Copy the prefix from FIRST_META_REGIONINFO into META_REGION_PREFIX.
@@ -1318,6 +1337,19 @@ public class MetaTableAccessor {
return delete;
}
+ public static Put makeBarrierPut(byte[] encodedRegionName, long seq, byte[] tableName) {
+ byte[] seqBytes = Bytes.toBytes(seq);
+ return new Put(encodedRegionName)
+ .addImmutable(HConstants.REPLICATION_BARRIER_FAMILY, seqBytes, seqBytes)
+ .addImmutable(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq, tableName);
+ }
+
+
+ public static Put makeSerialDaughterPut(byte[] encodedRegionName, byte[] value) {
+ return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_POSITION_FAMILY,
+ daughterNamePosCq, value);
+ }
+
/**
* Adds split daughters to the Put
*/
@@ -1334,27 +1366,28 @@ public class MetaTableAccessor {
}
/**
- * Put the passed <code>p</code> to the <code>hbase:meta</code> table.
+ * Put the passed <code>puts</code> to the <code>hbase:meta</code> table.
+ * Non-atomic for multi puts.
* @param connection connection we're using
- * @param p Put to add to hbase:meta
+ * @param puts Put to add to hbase:meta
* @throws IOException
*/
- static void putToMetaTable(final Connection connection, final Put p)
+ static void putToMetaTable(final Connection connection, final Put... puts)
throws IOException {
- put(getMetaHTable(connection), p);
+ put(getMetaHTable(connection), Arrays.asList(puts));
}
/**
* @param t Table to use (will be closed when done).
- * @param p put to make
+ * @param puts puts to make
* @throws IOException
*/
- private static void put(final Table t, final Put p) throws IOException {
+ private static void put(final Table t, final List<Put> puts) throws IOException {
try {
if (METALOG.isDebugEnabled()) {
- METALOG.debug(mutationToString(p));
+ METALOG.debug(mutationsToString(puts));
}
- t.put(p);
+ t.put(puts);
} finally {
t.close();
}
@@ -1490,7 +1523,7 @@ public class MetaTableAccessor {
* Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
* does not add its daughter's as different rows, but adds information about the daughters
* in the same row as the parent. Use
- * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)}
+ * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)}
* if you want to do that.
* @param meta the Table for META
* @param regionInfo region information
@@ -1515,7 +1548,7 @@ public class MetaTableAccessor {
* Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
* does not add its daughter's as different rows, but adds information about the daughters
* in the same row as the parent. Use
- * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)}
+ * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)}
* if you want to do that.
* @param connection connection we're using
* @param regionInfo region information
@@ -1601,11 +1634,12 @@ public class MetaTableAccessor {
* @param regionB
* @param sn the location of the region
* @param masterSystemTime
+ * @param saveBarrier true if need save replication barrier in meta, used for serial replication
* @throws IOException
*/
public static void mergeRegions(final Connection connection, HRegionInfo mergedRegion,
HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int regionReplication,
- long masterSystemTime)
+ long masterSystemTime, boolean saveBarrier)
throws IOException {
Table meta = getMetaHTable(connection);
try {
@@ -1636,7 +1670,17 @@ public class MetaTableAccessor {
byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString()
+ HConstants.DELIMITER);
- multiMutate(meta, tableRow, putOfMerged, deleteA, deleteB);
+ Mutation[] mutations;
+ if (saveBarrier) {
+ Put putBarrierA = makeSerialDaughterPut(regionA.getEncodedNameAsBytes(),
+ Bytes.toBytes(mergedRegion.getEncodedName()));
+ Put putBarrierB = makeSerialDaughterPut(regionB.getEncodedNameAsBytes(),
+ Bytes.toBytes(mergedRegion.getEncodedName()));
+ mutations = new Mutation[] { putOfMerged, deleteA, deleteB, putBarrierA, putBarrierB };
+ } else {
+ mutations = new Mutation[] { putOfMerged, deleteA, deleteB };
+ }
+ multiMutate(meta, tableRow, mutations);
} finally {
meta.close();
}
@@ -1652,10 +1696,11 @@ public class MetaTableAccessor {
* @param splitA Split daughter region A
* @param splitB Split daughter region A
* @param sn the location of the region
+ * @param saveBarrier true if need save replication barrier in meta, used for serial replication
*/
- public static void splitRegion(final Connection connection,
- HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
- ServerName sn, int regionReplication) throws IOException {
+ public static void splitRegion(final Connection connection, HRegionInfo parent,
+ HRegionInfo splitA, HRegionInfo splitB, ServerName sn, int regionReplication,
+ boolean saveBarrier) throws IOException {
Table meta = getMetaHTable(connection);
try {
HRegionInfo copyOfParent = new HRegionInfo(parent);
@@ -1680,8 +1725,17 @@ public class MetaTableAccessor {
addEmptyLocation(putB, i);
}
+ Mutation[] mutations;
+ if (saveBarrier) {
+ Put putBarrier = makeSerialDaughterPut(parent.getEncodedNameAsBytes(),
+ Bytes
+ .toBytes(splitA.getEncodedName() + HConstants.DELIMITER + splitB.getEncodedName()));
+ mutations = new Mutation[]{putParent, putA, putB, putBarrier};
+ } else {
+ mutations = new Mutation[]{putParent, putA, putB};
+ }
byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER);
- multiMutate(meta, tableRow, putParent, putA, putB);
+ multiMutate(meta, tableRow, mutations);
} finally {
meta.close();
}
@@ -1781,6 +1835,27 @@ public class MetaTableAccessor {
}
/**
+ * Updates the progress of pushing entries to peer cluster. Skip entry if value is -1.
+ * @param connection connection we're using
+ * @param peerId the peerId to push
+ * @param positions map that saving positions for each region
+ * @throws IOException
+ */
+ public static void updateReplicationPositions(Connection connection, String peerId,
+ Map<String, Long> positions) throws IOException {
+ List<Put> puts = new ArrayList<>();
+ for (Map.Entry<String, Long> entry : positions.entrySet()) {
+ long value = Math.abs(entry.getValue());
+ Put put = new Put(Bytes.toBytes(entry.getKey()));
+ put.addImmutable(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId),
+ Bytes.toBytes(value));
+ puts.add(put);
+ }
+ getMetaHTable(connection).put(puts);
+ }
+
+
+ /**
* Updates the location of the specified region to be the specified server.
* <p>
* Connects to the specified server which should be hosting the specified
@@ -1977,4 +2052,125 @@ public class MetaTableAccessor {
private static String mutationToString(Mutation p) throws IOException {
return p.getClass().getSimpleName() + p.toJSON();
}
+
+ /**
+ * Get replication position for a peer in a region.
+ * @param connection connection we're using
+ * @return the position of this peer, -1 if no position in meta.
+ */
+ public static long getReplicationPositionForOnePeer(Connection connection,
+ byte[] encodedRegionName, String peerId) throws IOException {
+ Get get = new Get(encodedRegionName);
+ get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId));
+ Result r = get(getMetaHTable(connection), get);
+ if (r.isEmpty()) {
+ return -1;
+ }
+ Cell cell = r.rawCells()[0];
+ return Bytes.toLong(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
+ }
+
+ /**
+ * Get replication positions for all peers in a region.
+ * @param connection connection we're using
+ * @param encodedRegionName region's encoded name
+ * @return the map of positions for each peer
+ */
+ public static Map<String, Long> getReplicationPositionForAllPeer(Connection connection,
+ byte[] encodedRegionName) throws IOException {
+ Get get = new Get(encodedRegionName);
+ get.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
+ Result r = get(getMetaHTable(connection), get);
+ Map<String, Long> map = new HashMap<>((int) (r.size() / 0.75 + 1));
+ for (Cell c : r.listCells()) {
+ if (!Bytes.equals(tableNamePosCq, 0, tableNamePosCq.length, c.getQualifierArray(),
+ c.getQualifierOffset(), c.getQualifierLength()) &&
+ !Bytes.equals(daughterNamePosCq, 0, daughterNamePosCq.length, c.getQualifierArray(),
+ c.getQualifierOffset(), c.getQualifierLength())) {
+ map.put(
+ Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()),
+ Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
+ }
+ }
+ return map;
+ }
+
+ /**
+ * Get replication barriers for all peers in a region.
+ * @param encodedRegionName region's encoded name
+ * @return a list of barrier sequence numbers.
+ * @throws IOException
+ */
+ public static List<Long> getReplicationBarriers(Connection connection, byte[] encodedRegionName)
+ throws IOException {
+ Get get = new Get(encodedRegionName);
+ get.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
+ Result r = get(getMetaHTable(connection), get);
+ List<Long> list = new ArrayList<>();
+ if (!r.isEmpty()) {
+ for (Cell cell : r.rawCells()) {
+ list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
+ cell.getQualifierLength()));
+ }
+ }
+ return list;
+ }
+
+ /**
+ * Get all barriers in all regions.
+ * @return a map of barrier lists in all regions
+ * @throws IOException
+ */
+ public static Map<String, List<Long>> getAllBarriers(Connection connection) throws IOException {
+ Map<String, List<Long>> map = new HashMap<>();
+ Scan scan = new Scan();
+ scan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
+ try (Table t = getMetaHTable(connection);
+ ResultScanner scanner = t.getScanner(scan)) {
+ Result result;
+ while ((result = scanner.next()) != null) {
+ String key = Bytes.toString(result.getRow());
+ List<Long> list = new ArrayList<>();
+ for (Cell cell : result.rawCells()) {
+ list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
+ cell.getQualifierLength()));
+ }
+ map.put(key, list);
+ }
+ }
+ return map;
+ }
+
+ /**
+ * Get daughter region(s) for a region, only used in serial replication.
+ * @throws IOException
+ */
+ public static String getSerialReplicationDaughterRegion(Connection connection, byte[] encodedName)
+ throws IOException {
+ Get get = new Get(encodedName);
+ get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, daughterNamePosCq);
+ Result result = get(getMetaHTable(connection), get);
+ if (!result.isEmpty()) {
+ Cell c = result.rawCells()[0];
+ return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+ }
+ return null;
+ }
+
+ /**
+ * Get the table name for a region, only used in serial replication.
+ * @throws IOException
+ */
+ public static String getSerialReplicationTableName(Connection connection, byte[] encodedName)
+ throws IOException {
+ Get get = new Get(encodedName);
+ get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq);
+ Result result = get(getMetaHTable(connection), get);
+ if (!result.isEmpty()) {
+ Cell c = result.rawCells()[0];
+ return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+ }
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 04edd25..37c62c5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -721,6 +721,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
Scan s = new Scan();
s.setReversed(true);
s.setStartRow(metaKey);
+ s.addFamily(HConstants.CATALOG_FAMILY);
s.setSmall(true);
s.setCaching(1);
if (this.useMetaReplicas) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index dca1821..ee26e38 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -92,8 +92,10 @@ public class ReplicationAdmin implements Closeable {
// only Global for now, can add other type
// such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
public static final String REPLICATIONTYPE = "replicationType";
- public static final String REPLICATIONGLOBAL = Integer
- .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
+ public static final String REPLICATIONGLOBAL =
+ Integer.toString(HConstants.REPLICATION_SCOPE_GLOBAL);
+ public static final String REPLICATIONSERIAL =
+ Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL);
private final Connection connection;
// TODO: replication should be managed by master. All the classes except ReplicationAdmin should
@@ -430,7 +432,10 @@ public class ReplicationAdmin implements Closeable {
HashMap<String, String> replicationEntry = new HashMap<String, String>();
replicationEntry.put(TNAME, tableName);
replicationEntry.put(CFNAME, column.getNameAsString());
- replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
+ replicationEntry.put(REPLICATIONTYPE,
+ column.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL ?
+ REPLICATIONGLOBAL :
+ REPLICATIONSERIAL);
replicationColFams.add(replicationEntry);
}
}
@@ -616,7 +621,8 @@ public class ReplicationAdmin implements Closeable {
*/
private boolean isTableRepEnabled(HTableDescriptor htd) {
for (HColumnDescriptor hcd : htd.getFamilies()) {
- if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
+ if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
+ && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
return false;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index ce18ef5..4c499a2 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -429,6 +429,20 @@ public final class HConstants {
/** The catalog family */
public static final byte [] CATALOG_FAMILY = Bytes.toBytes(CATALOG_FAMILY_STR);
+ /** The replication barrier family as a string*/
+ public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier";
+
+ /** The replication barrier family */
+ public static final byte [] REPLICATION_BARRIER_FAMILY =
+ Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR);
+
+ /** The replication barrier family as a string*/
+ public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position";
+
+ /** The replication barrier family */
+ public static final byte [] REPLICATION_POSITION_FAMILY =
+ Bytes.toBytes(REPLICATION_POSITION_FAMILY_STR);
+
/** The RegionInfo qualifier as a string */
public static final String REGIONINFO_QUALIFIER_STR = "regioninfo";
@@ -636,6 +650,12 @@ public final class HConstants {
public static final int REPLICATION_SCOPE_GLOBAL = 1;
/**
+ * Scope tag for serially scoped data
+ * This data will be replicated to all peers by the order of sequence id.
+ */
+ public static final int REPLICATION_SCOPE_SERIAL = 2;
+
+ /**
* Default cluster ID, cannot be used to identify a cluster so a key with
* this value means it wasn't meant for replication.
*/
@@ -866,6 +886,12 @@ public final class HConstants {
public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
/** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id";
+
+ public static final String
+ REPLICATION_SERIALLY_WAITING_KEY = "hbase.serial.replication.waitingMs";
+ public static final long
+ REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
+
/**
* Directory where the source cluster file system client configuration are placed which is used by
* sink cluster to copy HFiles from source cluster file system
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 116c7d9..a791717 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1571,6 +1571,20 @@ possible configurations would overwhelm and obscure the important.
slave clusters. The default of 10 will rarely need to be changed.
</description>
</property>
+ <property>
+ <name>hbase.serial.replication.waitingMs</name>
+ <value>10000</value>
+ <description>
+ By default, in replication we can not make sure the order of operations in slave cluster is
+ same as the order in master. If set REPLICATION_SCOPE to 2, we will push edits by the order
+ of written. This configure is to set how long (in ms) we will wait before next checking if a
+ log can not push right now because there are some logs written before it have not been pushed.
+ A larger waiting will decrease the number of queries on hbase:meta but will enlarge the delay
+ of replication. This feature relies on zk-less assignment, and conflicts with distributed log
+ replay. So users must set hbase.assignment.usezk and hbase.master.distributed.log.replay to
+ false to support it.
+ </description>
+ </property>
<!-- Static Web User Filter properties. -->
<property>
<description>
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index 28f4d4b..a675b12 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -21,6 +21,10 @@ public final class WALProtos {
* <code>REPLICATION_SCOPE_GLOBAL = 1;</code>
*/
REPLICATION_SCOPE_GLOBAL(1, 1),
+ /**
+ * <code>REPLICATION_SCOPE_SERIAL = 2;</code>
+ */
+ REPLICATION_SCOPE_SERIAL(2, 2),
;
/**
@@ -31,6 +35,10 @@ public final class WALProtos {
* <code>REPLICATION_SCOPE_GLOBAL = 1;</code>
*/
public static final int REPLICATION_SCOPE_GLOBAL_VALUE = 1;
+ /**
+ * <code>REPLICATION_SCOPE_SERIAL = 2;</code>
+ */
+ public static final int REPLICATION_SCOPE_SERIAL_VALUE = 2;
public final int getNumber() { return value; }
@@ -39,6 +47,7 @@ public final class WALProtos {
switch (value) {
case 0: return REPLICATION_SCOPE_LOCAL;
case 1: return REPLICATION_SCOPE_GLOBAL;
+ case 2: return REPLICATION_SCOPE_SERIAL;
default: return null;
}
}
@@ -12013,11 +12022,12 @@ public final class WALProtos {
"\030\005 \003(\0132\031.hbase.pb.StoreDescriptor\022$\n\006ser" +
"ver\030\006 \001(\0132\024.hbase.pb.ServerName\022\023\n\013regio" +
"n_name\030\007 \001(\014\".\n\tEventType\022\017\n\013REGION_OPEN" +
- "\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWALTrailer*F\n\tSc" +
+ "\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWALTrailer*d\n\tSc" +
"opeType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030" +
- "REPLICATION_SCOPE_GLOBAL\020\001B?\n*org.apache" +
- ".hadoop.hbase.protobuf.generatedB\tWALPro" +
- "tosH\001\210\001\000\240\001\001"
+ "REPLICATION_SCOPE_GLOBAL\020\001\022\034\n\030REPLICATIO" +
+ "N_SCOPE_SERIAL\020\002B?\n*org.apache.hadoop.hb" +
+ "ase.protobuf.generatedB\tWALProtosH\001\210\001\000\240\001" +
+ "\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-protocol/src/main/protobuf/WAL.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto
index c1d465a..2494977 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -75,6 +75,7 @@ message WALKey {
enum ScopeType {
REPLICATION_SCOPE_LOCAL = 0;
REPLICATION_SCOPE_GLOBAL = 1;
+ REPLICATION_SCOPE_SERIAL = 2;
}
message FamilyScope {
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 5ce056d..2022c5e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -98,6 +98,7 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
+import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
@@ -311,6 +312,7 @@ public class HMaster extends HRegionServer implements MasterServices {
CatalogJanitor catalogJanitorChore;
private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
+ private ReplicationMetaCleaner replicationMetaCleaner;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
@@ -988,6 +990,8 @@ public class HMaster extends HRegionServer implements MasterServices {
LOG.error("start replicationZKLockCleanerChore failed", e);
}
}
+ replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
+ getChoreService().scheduleChore(replicationMetaCleaner);
}
@Override
@@ -1022,6 +1026,7 @@ public class HMaster extends HRegionServer implements MasterServices {
if (this.logCleaner != null) this.logCleaner.cancel(true);
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true);
+ if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
if (this.quotaManager != null) this.quotaManager.stop();
if (this.activeMasterManager != null) this.activeMasterManager.stop();
if (this.serverManager != null) this.serverManager.stop();
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
index 82e28df..2dbc087 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
@@ -17,22 +17,25 @@
*/
package org.apache.hadoop.hbase.master;
+import com.google.common.base.Preconditions;
+
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.master.RegionState.State;
@@ -44,8 +47,6 @@ import org.apache.hadoop.hbase.util.MultiHConnection;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.zookeeper.KeeperException;
-import com.google.common.base.Preconditions;
-
/**
* A helper to persist region state in meta. We may change this class
* to StateStore later if we also use it to store other states in meta
@@ -60,7 +61,7 @@ public class RegionStateStore {
private volatile Region metaRegion;
private volatile boolean initialized;
private MultiHConnection multiHConnection;
- private final Server server;
+ private final MasterServices server;
/**
* Returns the {@link ServerName} from catalog table {@link Result}
@@ -130,7 +131,7 @@ public class RegionStateStore {
State.SPLITTING_NEW, State.MERGED));
}
- RegionStateStore(final Server server) {
+ RegionStateStore(final MasterServices server) {
this.server = server;
initialized = false;
}
@@ -187,31 +188,41 @@ public class RegionStateStore {
State state = newState.getState();
int replicaId = hri.getReplicaId();
- Put put = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
+ Put metaPut = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
StringBuilder info = new StringBuilder("Updating hbase:meta row ");
info.append(hri.getRegionNameAsString()).append(" with state=").append(state);
if (serverName != null && !serverName.equals(oldServer)) {
- put.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
+ metaPut.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
Bytes.toBytes(serverName.getServerName()));
info.append(", sn=").append(serverName);
}
if (openSeqNum >= 0) {
Preconditions.checkArgument(state == State.OPEN
&& serverName != null, "Open region should be on a server");
- MetaTableAccessor.addLocation(put, serverName, openSeqNum, -1, replicaId);
+ MetaTableAccessor.addLocation(metaPut, serverName, openSeqNum, -1, replicaId);
info.append(", openSeqNum=").append(openSeqNum);
info.append(", server=").append(serverName);
}
- put.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
+ metaPut.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
Bytes.toBytes(state.name()));
LOG.info(info);
-
+ HTableDescriptor descriptor = server.getTableDescriptors().get(hri.getTable());
+ boolean serial = false;
+ if (descriptor != null) {
+ serial = server.getTableDescriptors().get(hri.getTable()).hasSerialReplicationScope();
+ }
+ boolean shouldPutBarrier = serial && state == State.OPEN;
// Persist the state change to meta
if (metaRegion != null) {
try {
// Assume meta is pinned to master.
// At least, that's what we want.
- metaRegion.put(put);
+ metaRegion.put(metaPut);
+ if (shouldPutBarrier) {
+ Put barrierPut = MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
+ openSeqNum, hri.getTable().getName());
+ metaRegion.put(barrierPut);
+ }
return; // Done here
} catch (Throwable t) {
// In unit tests, meta could be moved away by intention
@@ -230,8 +241,10 @@ public class RegionStateStore {
}
}
// Called when meta is not on master
- multiHConnection.processBatchCallback(Arrays.asList(put),
- TableName.META_TABLE_NAME, null, null);
+ List<Put> list = shouldPutBarrier ?
+ Arrays.asList(metaPut, MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
+ openSeqNum, hri.getTable().getName())) : Arrays.asList(metaPut);
+ multiHConnection.processBatchCallback(list, TableName.META_TABLE_NAME, null, null);
} catch (IOException ioe) {
LOG.error("Failed to persist region state " + newState, ioe);
@@ -241,12 +254,14 @@ public class RegionStateStore {
void splitRegion(HRegionInfo p,
HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
- MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication);
+ MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication,
+ server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
}
void mergeRegions(HRegionInfo p,
HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication,
- EnvironmentEdgeManager.currentTime());
+ EnvironmentEdgeManager.currentTime(),
+ server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
new file mode 100644
index 0000000..e9647e8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This chore is to clean up the useless data in hbase:meta which is used by serial replication.
+ */
+@InterfaceAudience.Private
+public class ReplicationMetaCleaner extends ScheduledChore {
+
+ private static final Log LOG = LogFactory.getLog(ReplicationMetaCleaner.class);
+
+ private ReplicationAdmin replicationAdmin;
+ private MasterServices master;
+
+ public ReplicationMetaCleaner(MasterServices master, Stoppable stoppable, int period)
+ throws IOException {
+ super("ReplicationMetaCleaner", stoppable, period);
+ this.master = master;
+ replicationAdmin = new ReplicationAdmin(master.getConfiguration());
+ }
+
+ @Override
+ protected void chore() {
+ try {
+ Map<String, HTableDescriptor> tables = master.getTableDescriptors().getAllDescriptors();
+ Map<String, Set<String>> serialTables = new HashMap<>();
+ for (Map.Entry<String, HTableDescriptor> entry : tables.entrySet()) {
+ boolean hasSerialScope = false;
+ for (HColumnDescriptor column : entry.getValue().getFamilies()) {
+ if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL) {
+ hasSerialScope = true;
+ break;
+ }
+ }
+ if (hasSerialScope) {
+ serialTables.put(entry.getValue().getTableName().getNameAsString(), new HashSet<String>());
+ }
+ }
+ if (serialTables.isEmpty()){
+ return;
+ }
+
+ Map<String, ReplicationPeerConfig> peers = replicationAdmin.listPeerConfigs();
+ for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
+ for (Map.Entry<TableName, List<String>> map : entry.getValue().getTableCFsMap()
+ .entrySet()) {
+ if (serialTables.containsKey(map.getKey().getNameAsString())) {
+ serialTables.get(map.getKey().getNameAsString()).add(entry.getKey());
+ break;
+ }
+ }
+ }
+
+ Map<String, List<Long>> barrierMap = MetaTableAccessor.getAllBarriers(master.getConnection());
+ for (Map.Entry<String, List<Long>> entry : barrierMap.entrySet()) {
+ String encodedName = entry.getKey();
+ byte[] encodedBytes = Bytes.toBytes(encodedName);
+ boolean canClearRegion = false;
+ Map<String, Long> posMap = MetaTableAccessor.getReplicationPositionForAllPeer(
+ master.getConnection(), encodedBytes);
+ if (posMap.isEmpty()) {
+ continue;
+ }
+
+ String tableName = MetaTableAccessor.getSerialReplicationTableName(
+ master.getConnection(), encodedBytes);
+ Set<String> confPeers = serialTables.get(tableName);
+ if (confPeers == null) {
+ // This table doesn't exist or all cf's scope is not serial any more, we can clear meta.
+ canClearRegion = true;
+ } else {
+ if (!allPeersHavePosition(confPeers, posMap)) {
+ continue;
+ }
+
+ String daughterValue = MetaTableAccessor
+ .getSerialReplicationDaughterRegion(master.getConnection(), encodedBytes);
+ if (daughterValue != null) {
+ //this region is merged or split
+ boolean allDaughterStart = true;
+ String[] daughterRegions = daughterValue.split(",");
+ for (String daughter : daughterRegions) {
+ byte[] region = Bytes.toBytes(daughter);
+ if (!MetaTableAccessor.getReplicationBarriers(
+ master.getConnection(), region).isEmpty() &&
+ !allPeersHavePosition(confPeers,
+ MetaTableAccessor
+ .getReplicationPositionForAllPeer(master.getConnection(), region))) {
+ allDaughterStart = false;
+ break;
+ }
+ }
+ if (allDaughterStart) {
+ canClearRegion = true;
+ }
+ }
+ }
+ if (canClearRegion) {
+ Delete delete = new Delete(encodedBytes);
+ delete.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
+ delete.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
+ try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
+ metaTable.delete(delete);
+ }
+ } else {
+
+ // Barriers whose seq is larger than min pos of all peers, and the last barrier whose seq
+ // is smaller than min pos should be kept. All other barriers can be deleted.
+
+ long minPos = Long.MAX_VALUE;
+ for (Map.Entry<String, Long> pos : posMap.entrySet()) {
+ minPos = Math.min(minPos, pos.getValue());
+ }
+ List<Long> barriers = entry.getValue();
+ int index = Collections.binarySearch(barriers, minPos);
+ if (index < 0) {
+ index = -index - 1;
+ }
+ Delete delete = new Delete(encodedBytes);
+ for (int i = 0; i < index - 1; i++) {
+ delete.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, Bytes.toBytes(barriers.get(i)));
+ }
+ try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
+ metaTable.delete(delete);
+ }
+ }
+
+ }
+
+ } catch (IOException e) {
+ LOG.error("Exception during cleaning up.", e);
+ }
+
+ }
+
+ private boolean allPeersHavePosition(Set<String> peers, Map<String, Long> posMap)
+ throws IOException {
+ for(String peer:peers){
+ if (!posMap.containsKey(peer)){
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 3449832..72474a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -125,7 +125,6 @@ class FSWALEntry extends Entry {
CellUtil.setSequenceId(c, regionSequenceId);
}
}
-
getKey().setWriteEntry(we);
return regionSequenceId;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 4f518bb..741065a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
@@ -279,6 +280,17 @@ public class Replication extends WALActionsListener.Base implements
for (Cell cell : logEdit.getCells()) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
foundOtherEdits = true;
+ break;
+ }
+ }
+
+ if (!foundOtherEdits && logEdit.getCells().size() > 0) {
+ WALProtos.RegionEventDescriptor maybeEvent =
+ WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0));
+ if (maybeEvent != null && (maybeEvent.getEventType() ==
+ WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
+ // In serially replication, we use scopes when reading close marker.
+ foundOtherEdits = true;
}
}
if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 2f3b2a8..ce0fb06 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -18,6 +18,10 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
@@ -29,8 +33,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
@@ -48,9 +54,11 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -102,6 +110,8 @@ public class ReplicationSource extends Thread
private ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to
private String peerId;
+
+ String actualPeerId;
// The manager of all sources to which we ping back our progress
private ReplicationSourceManager manager;
// Should we stop everything?
@@ -185,6 +195,8 @@ public class ReplicationSource extends Thread
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
+ ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+ this.actualPeerId = replicationQueueInfo.getPeerId();
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
this.replicationEndpoint = replicationEndpoint;
}
@@ -507,6 +519,17 @@ public class ReplicationSource extends Thread
// Current number of hfiles that we need to replicate
private long currentNbHFiles = 0;
+ // Use guava cache to set ttl for each key
+ private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
+ .expireAfterAccess(1, TimeUnit.DAYS).build(
+ new CacheLoader<String, Boolean>() {
+ @Override
+ public Boolean load(String key) throws Exception {
+ return false;
+ }
+ }
+ );
+
public ReplicationSourceWorkerThread(String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
ReplicationSource source) {
@@ -588,9 +611,24 @@ public class ReplicationSource extends Thread
currentNbOperations = 0;
currentNbHFiles = 0;
List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
+
+ Map<String, Long> lastPositionsForSerialScope = new HashMap<>();
currentSize = 0;
try {
- if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
+ if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries,
+ lastPositionsForSerialScope)) {
+ for (Map.Entry<String, Long> entry : lastPositionsForSerialScope.entrySet()) {
+ waitingUntilCanPush(entry);
+ }
+ try {
+ MetaTableAccessor
+ .updateReplicationPositions(manager.getConnection(), actualPeerId,
+ lastPositionsForSerialScope);
+ } catch (IOException e) {
+ LOG.error("updateReplicationPositions fail", e);
+ stopper.stop("updateReplicationPositions fail");
+ }
+
continue;
}
} catch (IOException ioe) {
@@ -626,15 +664,30 @@ public class ReplicationSource extends Thread
LOG.warn("Unable to finalize the tailing of a file", e);
}
}
-
+ for(Map.Entry<String, Long> entry: lastPositionsForSerialScope.entrySet()) {
+ waitingUntilCanPush(entry);
+ }
// If we didn't get anything to replicate, or if we hit a IOE,
// wait a bit and retry.
// But if we need to stop, don't bother sleeping
if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
- manager.logPositionAndCleanOldLogs(this.currentPath,
- peerClusterZnode, this.repLogReader.getPosition(),
+
+ // Save positions to meta table before zk.
+ if (!gotIOE) {
+ try {
+ MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
+ lastPositionsForSerialScope);
+ } catch (IOException e) {
+ LOG.error("updateReplicationPositions fail", e);
+ stopper.stop("updateReplicationPositions fail");
+ }
+ }
+
+ manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
+ this.repLogReader.getPosition(),
this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
+
this.lastLoggedPosition = this.repLogReader.getPosition();
}
// Reset the sleep multiplier if nothing has actually gone wrong
@@ -649,8 +702,7 @@ public class ReplicationSource extends Thread
}
continue;
}
- sleepMultiplier = 1;
- shipEdits(currentWALisBeingWrittenTo, entries);
+ shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope);
}
if (replicationQueueInfo.isQueueRecovered()) {
// use synchronize to make sure one last thread will clean the queue
@@ -672,16 +724,42 @@ public class ReplicationSource extends Thread
}
}
+ private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
+ String key = entry.getKey();
+ long seq = entry.getValue();
+ boolean deleteKey = false;
+ if (seq <= 0) {
+ // There is a REGION_CLOSE marker, we can not continue skipping after this entry.
+ deleteKey = true;
+ seq = -seq;
+ }
+
+ if (!canSkipWaitingSet.getUnchecked(key)) {
+ try {
+ manager.waitUntilCanBePushed(Bytes.toBytes(key), seq, actualPeerId);
+ } catch (Exception e) {
+ LOG.error("waitUntilCanBePushed fail", e);
+ stopper.stop("waitUntilCanBePushed fail");
+ }
+ canSkipWaitingSet.put(key, true);
+ }
+ if (deleteKey) {
+ canSkipWaitingSet.invalidate(key);
+ }
+ }
+
/**
* Read all the entries from the current log files and retain those that need to be replicated.
* Else, process the end of the current file.
* @param currentWALisBeingWrittenTo is the current WAL being written to
* @param entries resulting entries to be replicated
+ * @param lastPosition save the last sequenceid for each region if the table has
+ * serial-replication scope
* @return true if we got nothing and went to the next file, false if we got entries
* @throws IOException
*/
protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
- List<WAL.Entry> entries) throws IOException {
+ List<WAL.Entry> entries, Map<String, Long> lastPosition) throws IOException {
long seenEntries = 0;
if (LOG.isTraceEnabled()) {
LOG.trace("Seeking in " + this.currentPath + " at position "
@@ -694,6 +772,27 @@ public class ReplicationSource extends Thread
metrics.incrLogEditsRead();
seenEntries++;
+ if (entry.hasSerialReplicationScope()) {
+ String key = Bytes.toString(entry.getKey().getEncodedRegionName());
+ lastPosition.put(key, entry.getKey().getSequenceId());
+ if (entry.getEdit().getCells().size() > 0) {
+ WALProtos.RegionEventDescriptor maybeEvent =
+ WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
+ if (maybeEvent != null && maybeEvent.getEventType()
+ == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
+ // In serially replication, if we move a region to another RS and move it back, we may
+ // read logs crossing two sections. We should break at REGION_CLOSE and push the first
+ // section first in case of missing the middle section belonging to the other RS.
+ // In a worker thread, if we can push the first log of a region, we can push all logs
+ // in the same region without waiting until we read a close marker because next time
+ // we read logs in this region, it must be a new section and not adjacent with this
+ // region. Mark it negative.
+ lastPosition.put(key, -entry.getKey().getSequenceId());
+ break;
+ }
+ }
+ }
+
// don't replicate if the log entries have already been consumed by the cluster
if (replicationEndpoint.canReplicateToSameCluster()
|| !entry.getKey().getClusterIds().contains(peerClusterId)) {
@@ -723,6 +822,7 @@ public class ReplicationSource extends Thread
|| entries.size() >= replicationQueueNbCapacity) {
break;
}
+
try {
entry = this.repLogReader.readNextAndSetPosition();
} catch (IOException ie) {
@@ -995,7 +1095,8 @@ public class ReplicationSource extends Thread
* @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
* written to when this method was called
*/
- protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
+ protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries,
+ Map<String, Long> lastPositionsForSerialScope) {
int sleepMultiplier = 0;
if (entries.isEmpty()) {
LOG.warn("Was given 0 edits to ship");
@@ -1046,6 +1147,16 @@ public class ReplicationSource extends Thread
for (int i = 0; i < size; i++) {
cleanUpHFileRefs(entries.get(i).getEdit());
}
+
+ // Save positions to meta table before zk.
+ try {
+ MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
+ lastPositionsForSerialScope);
+ } catch (IOException e) {
+ LOG.error("updateReplicationPositions fail", e);
+ stopper.stop("updateReplicationPositions fail");
+ }
+
//Log and clean up WAL logs
manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 3cb7a84..a6f1891 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -48,10 +49,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@@ -64,6 +68,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
/**
@@ -118,6 +123,8 @@ public class ReplicationSourceManager implements ReplicationListener {
private final Random rand;
private final boolean replicationForBulkLoadDataEnabled;
+ private Connection connection;
+ private long replicationWaitTime;
/**
* Creates a replication manager and sets the watch on all the other registered region servers
@@ -134,7 +141,7 @@ public class ReplicationSourceManager implements ReplicationListener {
public ReplicationSourceManager(final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
- final Path oldLogDir, final UUID clusterId) {
+ final Path oldLogDir, final UUID clusterId) throws IOException {
//CopyOnWriteArrayList is thread-safe.
//Generally, reading is more than modifying.
this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
@@ -171,6 +178,9 @@ public class ReplicationSourceManager implements ReplicationListener {
replicationForBulkLoadDataEnabled =
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+ this.replicationWaitTime = conf.getLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY,
+ HConstants.REPLICATION_SERIALLY_WAITING_DEFAULT);
+ connection = ConnectionFactory.createConnection(conf);
}
/**
@@ -782,6 +792,10 @@ public class ReplicationSourceManager implements ReplicationListener {
return this.fs;
}
+ public Connection getConnection() {
+ return this.connection;
+ }
+
/**
* Get the ReplicationPeers used by this ReplicationSourceManager
* @return the ReplicationPeers used by this ReplicationSourceManager
@@ -814,4 +828,75 @@ public class ReplicationSourceManager implements ReplicationListener {
public void cleanUpHFileRefs(String peerId, List<String> files) {
this.replicationQueues.removeHFileRefs(peerId, files);
}
+
+ /**
+ * Whether an entry can be pushed to the peer or not right now.
+ * If we enable serial replication, we can not push the entry until all entries in its region
+ * whose sequence numbers are smaller than this entry have been pushed.
+ * For each ReplicationSource, we need only check the first entry in each region, as long as it
+ * can be pushed, we can push all in this ReplicationSource.
+ * This method will be blocked until we can push.
+ * @return the first barrier of entry's region, or -1 if there is no barrier. It is used to
+ * prevent saving positions in the region of no barrier.
+ */
+ void waitUntilCanBePushed(byte[] encodedName, long seq, String peerId)
+ throws IOException, InterruptedException {
+
+ /**
+ * There are barriers for this region and position for this peer. N barriers form N intervals,
+ * (b1,b2) (b2,b3) ... (bn,max). Generally, there is no logs whose seq id is not greater than
+ * the first barrier and the last interval is start from the last barrier.
+ *
+ * There are several conditions that we can push now, otherwise we should block:
+ * 1) "Serial replication" is not enabled, we can push all logs just like before. This case
+ * should not call this method.
+ * 2) There is no barriers for this region, or the seq id is smaller than the first barrier.
+ * It is mainly because we alter REPLICATION_SCOPE = 2. We can not guarantee the
+ * order of logs that is written before altering.
+ * 3) This entry is in the first interval of barriers. We can push them because it is the
+ * start of a region. Splitting/merging regions are also ok because the first section of
+ * daughter region is in same region of parents and the order in one RS is guaranteed.
+ * 4) If the entry's seq id and the position are in same section, or the pos is the last
+ * number of previous section. Because when open a region we put a barrier the number
+ * is the last log's id + 1.
+ * 5) Log's seq is smaller than pos in meta, we are retrying. It may happen when a RS crashes
+ * after save replication meta and before save zk offset.
+ */
+ List<Long> barriers = MetaTableAccessor.getReplicationBarriers(connection, encodedName);
+ if (barriers.isEmpty() || seq <= barriers.get(0)) {
+ // Case 2
+ return;
+ }
+ int interval = Collections.binarySearch(barriers, seq);
+ if (interval < 0) {
+ interval = -interval - 1;// get the insert position if negative
+ }
+ if (interval == 1) {
+ // Case 3
+ return;
+ }
+
+ while (true) {
+ long pos = MetaTableAccessor.getReplicationPositionForOnePeer(connection, encodedName, peerId);
+ if (seq <= pos) {
+ // Case 5
+ }
+ if (pos >= 0) {
+ // Case 4
+ int posInterval = Collections.binarySearch(barriers, pos);
+ if (posInterval < 0) {
+ posInterval = -posInterval - 1;// get the insert position if negative
+ }
+ if (posInterval == interval || pos == barriers.get(interval - 1) - 1) {
+ return;
+ }
+ }
+
+ LOG.info(Bytes.toString(encodedName) + " can not start pushing to peer " + peerId
+ + " because previous log has not been pushed: sequence=" + seq + " pos=" + pos
+ + " barriers=" + Arrays.toString(barriers.toArray()));
+ Thread.sleep(replicationWaitTime);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
index 1c59a44..81dadd9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
@@ -144,6 +144,30 @@ public class FSTableDescriptors implements TableDescriptors {
// Enable cache of data blocks in L1 if more than one caching tier deployed:
// e.g. if using CombinedBlockCache (BucketCache).
.setCacheDataInL1(true),
+ new HColumnDescriptor(HConstants.REPLICATION_BARRIER_FAMILY)
+ .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
+ HConstants.DEFAULT_HBASE_META_VERSIONS))
+ .setInMemory(true)
+ .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
+ HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
+ .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
+ // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
+ .setBloomFilterType(BloomType.NONE)
+ // Enable cache of data blocks in L1 if more than one caching tier deployed:
+ // e.g. if using CombinedBlockCache (BucketCache).
+ .setCacheDataInL1(true),
+ new HColumnDescriptor(HConstants.REPLICATION_POSITION_FAMILY)
+ .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
+ HConstants.DEFAULT_HBASE_META_VERSIONS))
+ .setInMemory(true)
+ .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
+ HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
+ .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
+ // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
+ .setBloomFilterType(BloomType.NONE)
+ // Enable cache of data blocks in L1 if more than one caching tier deployed:
+ // e.g. if using CombinedBlockCache (BucketCache).
+ .setCacheDataInL1(true),
new HColumnDescriptor(HConstants.TABLE_FAMILY)
// Ten is arbitrary number. Keep versions to help debugging.
.setMaxVersions(10)
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index af63b0b..79321b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -22,8 +22,11 @@ package org.apache.hadoop.hbase.wal;
import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -35,6 +38,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
/**
* A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
@@ -282,6 +286,18 @@ public interface WAL {
key.setCompressionContext(compressionContext);
}
+ public boolean hasSerialReplicationScope () {
+ if (getKey().getReplicationScopes() == null || getKey().getReplicationScopes().isEmpty()) {
+ return false;
+ }
+ for (Map.Entry<byte[], Integer> e:getKey().getReplicationScopes().entrySet()) {
+ if (e.getValue() == HConstants.REPLICATION_SCOPE_SERIAL){
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public String toString() {
return this.key + "=" + this.edit;
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
index 8b84452..d750faf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
@@ -452,7 +452,7 @@ public class TestMetaTableAccessor {
List<HRegionInfo> regionInfos = Lists.newArrayList(parent);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
- MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
+ MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 1);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 2);
@@ -481,7 +481,7 @@ public class TestMetaTableAccessor {
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3,
- HConstants.LATEST_TIMESTAMP);
+ HConstants.LATEST_TIMESTAMP, false);
assertEmptyMetaLocation(meta, merged.getRegionName(), 1);
assertEmptyMetaLocation(meta, merged.getRegionName(), 2);
@@ -609,7 +609,7 @@ public class TestMetaTableAccessor {
// now merge the regions, effectively deleting the rows for region a and b.
MetaTableAccessor.mergeRegions(connection, mergedRegionInfo,
- regionInfoA, regionInfoB, sn, 1, masterSystemTime);
+ regionInfoA, regionInfoB, sn, 1, masterSystemTime, false);
result = meta.get(get);
serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
@@ -692,7 +692,7 @@ public class TestMetaTableAccessor {
}
SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler();
long prevCalls = scheduler.numPriorityCalls;
- MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1);
+ MetaTableAccessor.splitRegion(connection, parent, splitA, splitB,loc.getServerName(),1,false);
assertTrue(prevCalls < scheduler.numPriorityCalls);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5cadcd59/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
index 7d3d2e9..c15ccf4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
@@ -1211,7 +1211,7 @@ public class TestAssignmentManagerOnCluster {
public void testUpdatesRemoteMeta() throws Exception {
conf.setInt("hbase.regionstatestore.meta.connection", 3);
final RegionStateStore rss =
- new RegionStateStore(new MyRegionServer(conf, new ZkCoordinatedStateManager()));
+ new RegionStateStore(new MyMaster(conf, new ZkCoordinatedStateManager()));
rss.start();
// Create 10 threads and make each do 10 puts related to region state update
Thread[] th = new Thread[10];