You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2017/11/15 23:19:00 UTC
[1/2] hbase git commit: HBASE-19009 implement modifyTable and
enable/disableTableReplication for AsyncAdmin
Repository: hbase
Updated Branches:
refs/heads/master d89682ea9 -> 600fdee84
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
index 2de61cb..8f09479 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -30,14 +30,14 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -114,7 +114,7 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
try {
byte[] data = peerConfigTracker.getData(false);
if (data != null) {
- this.peerConfig = ReplicationSerDeHelper.parsePeerFrom(data);
+ this.peerConfig = ReplicationPeerConfigUtil.parsePeerFrom(data);
}
} catch (DeserializationException e) {
LOG.error("", e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 0f39b2a..cc84c1d 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -35,8 +35,7 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
@@ -46,6 +45,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
/**
@@ -131,7 +131,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
List<ZKUtilOp> listOfOps = new ArrayList<>(2);
ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id),
- ReplicationSerDeHelper.toByteArray(peerConfig));
+ ReplicationPeerConfigUtil.toByteArray(peerConfig));
// b/w PeerWatcher and ReplicationZookeeper#add method to create the
// peer-state znode. This happens while adding a peer
// The peer state data is set as "ENABLED" by default.
@@ -206,9 +206,9 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
rpc.setTableCFsMap(tableCFs);
ZKUtil.setData(this.zookeeper, getPeerNode(id),
- ReplicationSerDeHelper.toByteArray(rpc));
+ ReplicationPeerConfigUtil.toByteArray(rpc));
LOG.info("Peer tableCFs with id= " + id + " is now " +
- ReplicationSerDeHelper.convertToString(tableCFs));
+ ReplicationPeerConfigUtil.convertToString(tableCFs));
} catch (KeeperException e) {
throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
}
@@ -303,7 +303,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
try {
- return ReplicationSerDeHelper.parsePeerFrom(data);
+ return ReplicationPeerConfigUtil.parsePeerFrom(data);
} catch (DeserializationException e) {
LOG.warn("Failed to parse cluster key from peerId=" + peerId
+ ", specifically the content from the following znode: " + znode);
@@ -372,7 +372,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
try {
ZKUtil.setData(this.zookeeper, getPeerNode(id),
- ReplicationSerDeHelper.toByteArray(existingConfig));
+ ReplicationPeerConfigUtil.toByteArray(existingConfig));
}
catch(KeeperException ke){
throw new ReplicationException("There was a problem trying to save changes to the " +
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 3c751f7..2e3df2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
@@ -1809,7 +1809,7 @@ public class MasterRpcServices extends RSRpcServices
AddReplicationPeerRequest request) throws ServiceException {
try {
master.addReplicationPeer(request.getPeerId(),
- ReplicationSerDeHelper.convert(request.getPeerConfig()));
+ ReplicationPeerConfigUtil.convert(request.getPeerConfig()));
return AddReplicationPeerResponse.newBuilder().build();
} catch (ReplicationException | IOException e) {
throw new ServiceException(e);
@@ -1858,7 +1858,7 @@ public class MasterRpcServices extends RSRpcServices
String peerId = request.getPeerId();
ReplicationPeerConfig peerConfig = master.getReplicationPeerConfig(peerId);
response.setPeerId(peerId);
- response.setPeerConfig(ReplicationSerDeHelper.convert(peerConfig));
+ response.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
} catch (ReplicationException | IOException e) {
throw new ServiceException(e);
}
@@ -1870,7 +1870,7 @@ public class MasterRpcServices extends RSRpcServices
UpdateReplicationPeerConfigRequest request) throws ServiceException {
try {
master.updateReplicationPeerConfig(request.getPeerId(),
- ReplicationSerDeHelper.convert(request.getPeerConfig()));
+ ReplicationPeerConfigUtil.convert(request.getPeerConfig()));
return UpdateReplicationPeerConfigResponse.newBuilder().build();
} catch (ReplicationException | IOException e) {
throw new ServiceException(e);
@@ -1885,7 +1885,7 @@ public class MasterRpcServices extends RSRpcServices
List<ReplicationPeerDescription> peers = master
.listReplicationPeers(request.hasRegex() ? request.getRegex() : null);
for (ReplicationPeerDescription peer : peers) {
- response.addPeerDesc(ReplicationSerDeHelper.toProtoReplicationPeerDescription(peer));
+ response.addPeerDesc(ReplicationPeerConfigUtil.toProtoReplicationPeerDescription(peer));
}
} catch (ReplicationException | IOException e) {
throw new ServiceException(e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
index 0585c97..d094d1c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
@@ -23,15 +23,15 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
@@ -79,12 +79,12 @@ public class TableCFsUpdater extends ReplicationStateZKBase {
// we copy TableCFs node into PeerNode
LOG.info("copy tableCFs into peerNode:" + peerId);
ReplicationProtos.TableCF[] tableCFs =
- ReplicationSerDeHelper.parseTableCFs(
+ ReplicationPeerConfigUtil.parseTableCFs(
ZKUtil.getData(this.zookeeper, tableCFsNode));
if (tableCFs != null && tableCFs.length > 0) {
- rpc.setTableCFsMap(ReplicationSerDeHelper.convert2Map(tableCFs));
+ rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs));
ZKUtil.setData(this.zookeeper, peerNode,
- ReplicationSerDeHelper.toByteArray(rpc));
+ ReplicationPeerConfigUtil.toByteArray(rpc));
}
} else {
LOG.info("No tableCFs in peerNode:" + peerId);
@@ -113,7 +113,7 @@ public class TableCFsUpdater extends ReplicationStateZKBase {
return null;
}
try {
- return ReplicationSerDeHelper.parsePeerFrom(data);
+ return ReplicationPeerConfigUtil.parsePeerFrom(data);
} catch (DeserializationException e) {
LOG.warn("Failed to parse cluster key from peer=" + peerNode);
return null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
index 3e577bc..e489078 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
@@ -41,10 +41,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.BeforeClass;
-import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
new file mode 100644
index 0000000..bf60053
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Class to test asynchronous replication admin operations when more than 1 cluster
+ */
+@RunWith(Parameterized.class)
+@Category({LargeTests.class, ClientTests.class})
+public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase {
+
+ private final static String ID_SECOND = "2";
+
+ private static HBaseTestingUtility TEST_UTIL2;
+ private static Configuration conf2;
+ private static AsyncAdmin admin2;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+ TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
+ TEST_UTIL.startMiniCluster();
+ ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+
+ conf2 = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+ conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+ TEST_UTIL2 = new HBaseTestingUtility(conf2);
+ TEST_UTIL2.startMiniCluster();
+ admin2 =
+ ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get().getAdmin();
+
+ ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+ rpc.setClusterKey(TEST_UTIL2.getClusterKey());
+ ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Pattern pattern = Pattern.compile(tableName.getNameAsString() + ".*");
+ cleanupTables(admin, pattern);
+ cleanupTables(admin2, pattern);
+ }
+
+ private void cleanupTables(AsyncAdmin admin, Pattern pattern) {
+ admin.listTableNames(pattern, false).whenCompleteAsync((tables, err) -> {
+ if (tables != null) {
+ tables.forEach(table -> {
+ try {
+ admin.disableTable(table).join();
+ } catch (Exception e) {
+ LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
+ }
+ admin.deleteTable(table).join();
+ });
+ }
+ }, ForkJoinPool.commonPool()).join();
+ }
+
+ private void createTableWithDefaultConf(AsyncAdmin admin, TableName tableName) {
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+ builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
+ admin.createTable(builder.build()).join();
+ }
+
+ @Test
+ public void testEnableAndDisableTableReplication() throws Exception {
+ // default replication scope is local
+ createTableWithDefaultConf(tableName);
+ admin.enableTableReplication(tableName).join();
+ TableDescriptor tableDesc = admin.getTableDescriptor(tableName).get();
+ for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
+ assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
+ }
+
+ admin.disableTableReplication(tableName).join();
+ tableDesc = admin.getTableDescriptor(tableName).get();
+ for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
+ assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope());
+ }
+ }
+
+ @Test
+ public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception {
+ // Only create table in source cluster
+ createTableWithDefaultConf(tableName);
+ assertFalse(admin2.tableExists(tableName).get());
+ admin.enableTableReplication(tableName).join();
+ assertTrue(admin2.tableExists(tableName).get());
+ }
+
+ @Test
+ public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception {
+ createTableWithDefaultConf(admin, tableName);
+ createTableWithDefaultConf(admin2, tableName);
+ TableDescriptorBuilder builder =
+ TableDescriptorBuilder.newBuilder(admin.getTableDescriptor(tableName).get());
+ builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("newFamily"))
+ .build());
+ admin2.disableTable(tableName).join();
+ admin2.modifyTable(builder.build()).join();
+ admin2.enableTable(tableName).join();
+
+ try {
+ admin.enableTableReplication(tableName).join();
+ fail("Exception should be thrown if table descriptors in the clusters are not same.");
+ } catch (Exception ignored) {
+ // ok
+ }
+
+ admin.disableTable(tableName).join();
+ admin.modifyTable(builder.build()).join();
+ admin.enableTable(tableName).join();
+ admin.enableTableReplication(tableName).join();
+ TableDescriptor tableDesc = admin.getTableDescriptor(tableName).get();
+ for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
+ assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
+ }
+ }
+
+ @Test
+ public void testDisableReplicationForNonExistingTable() throws Exception {
+ try {
+ admin.disableTableReplication(tableName).join();
+ } catch (CompletionException e) {
+ assertTrue(e.getCause() instanceof TableNotFoundException);
+ }
+ }
+
+ @Test
+ public void testEnableReplicationForNonExistingTable() throws Exception {
+ try {
+ admin.enableTableReplication(tableName).join();
+ } catch (CompletionException e) {
+ assertTrue(e.getCause() instanceof TableNotFoundException);
+ }
+ }
+
+ @Test
+ public void testDisableReplicationWhenTableNameAsNull() throws Exception {
+ try {
+ admin.disableTableReplication(null).join();
+ } catch (CompletionException e) {
+ assertTrue(e.getCause() instanceof IllegalArgumentException);
+ }
+ }
+
+ @Test
+ public void testEnableReplicationWhenTableNameAsNull() throws Exception {
+ try {
+ admin.enableTableReplication(null).join();
+ } catch (CompletionException e) {
+ assertTrue(e.getCause() instanceof IllegalArgumentException);
+ }
+ }
+
+ /*
+ * Test enable table replication should create table only in user explicit specified table-cfs.
+ * HBASE-14717
+ */
+ @Test
+ public void testEnableReplicationForExplicitSetTableCfs() throws Exception {
+ TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "2");
+ // Only create table in source cluster
+ createTableWithDefaultConf(tableName);
+ createTableWithDefaultConf(tableName2);
+ assertFalse("Table should not exists in the peer cluster",
+ admin2.tableExists(tableName).get());
+ assertFalse("Table should not exists in the peer cluster",
+ admin2.tableExists(tableName2).get());
+
+ Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>();
+ tableCfs.put(tableName, null);
+ ReplicationPeerConfig rpc = admin.getReplicationPeerConfig(ID_SECOND).get();
+ rpc.setTableCFsMap(tableCfs);
+ try {
+ // Only add tableName to replication peer config
+ admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
+ admin.enableTableReplication(tableName2).join();
+ assertFalse("Table should not be created if user has set table cfs explicitly for the "
+ + "peer and this is not part of that collection", admin2.tableExists(tableName2).get());
+
+ // Add tableName2 to replication peer config, too
+ tableCfs.put(tableName2, null);
+ rpc.setTableCFsMap(tableCfs);
+ admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
+ admin.enableTableReplication(tableName2).join();
+ assertTrue(
+ "Table should be created if user has explicitly added table into table cfs collection",
+ admin2.tableExists(tableName2).get());
+ } finally {
+ rpc.setTableCFsMap(null);
+ admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index a23b76a..62951ef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -241,7 +241,7 @@ public class TestReplicationAdmin {
tableCFs.put(tableName1, null);
admin.appendPeerTableCFs(ID_ONE, tableCFs);
Map<TableName, List<String>> result =
- ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
+ ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
assertEquals(1, result.size());
assertEquals(true, result.containsKey(tableName1));
assertNull(result.get(tableName1));
@@ -250,7 +250,7 @@ public class TestReplicationAdmin {
tableCFs.clear();
tableCFs.put(tableName2, null);
admin.appendPeerTableCFs(ID_ONE, tableCFs);
- result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
+ result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
assertEquals(2, result.size());
assertTrue("Should contain t1", result.containsKey(tableName1));
assertTrue("Should contain t2", result.containsKey(tableName2));
@@ -262,7 +262,7 @@ public class TestReplicationAdmin {
tableCFs.put(tableName3, new ArrayList<>());
tableCFs.get(tableName3).add("f1");
admin.appendPeerTableCFs(ID_ONE, tableCFs);
- result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
+ result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
assertEquals(3, result.size());
assertTrue("Should contain t1", result.containsKey(tableName1));
assertTrue("Should contain t2", result.containsKey(tableName2));
@@ -277,7 +277,7 @@ public class TestReplicationAdmin {
tableCFs.get(tableName4).add("f1");
tableCFs.get(tableName4).add("f2");
admin.appendPeerTableCFs(ID_ONE, tableCFs);
- result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
+ result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
assertEquals(4, result.size());
assertTrue("Should contain t1", result.containsKey(tableName1));
assertTrue("Should contain t2", result.containsKey(tableName2));
@@ -299,7 +299,7 @@ public class TestReplicationAdmin {
tableCFs.put(tableName5, new ArrayList<>());
tableCFs.get(tableName5).add("f1");
admin.appendPeerTableCFs(ID_ONE, tableCFs);
- result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
+ result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
assertEquals(5, result.size());
assertTrue("Should contain t5", result.containsKey(tableName5));
// null means replication all cfs of tab5
@@ -313,7 +313,7 @@ public class TestReplicationAdmin {
tableCFs.clear();
tableCFs.put(tableName6, new ArrayList<>());
admin.appendPeerTableCFs(ID_ONE, tableCFs);
- result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
+ result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
assertEquals(6, result.size());
assertTrue("Should contain t6", result.containsKey(tableName6));
// null means replication all cfs of tab6
@@ -354,7 +354,7 @@ public class TestReplicationAdmin {
} catch (ReplicationException e) {
}
Map<TableName, List<String>> result =
- ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
+ ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
assertEquals(2, result.size());
assertTrue("Should contain t1", result.containsKey(tableName1));
assertTrue("Should contain t2", result.containsKey(tableName2));
@@ -373,7 +373,7 @@ public class TestReplicationAdmin {
tableCFs.clear();
tableCFs.put(tableName1, null);
admin.removePeerTableCFs(ID_ONE, tableCFs);
- result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
+ result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
assertEquals(1, result.size());
assertEquals(1, result.get(tableName2).size());
assertEquals("cf1", result.get(tableName2).get(0));
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 3e499b2..6b7d36b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -58,7 +58,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
@@ -528,7 +528,7 @@ public class TestMasterReplication {
.getAdmin()) {
admin.addReplicationPeer(id,
new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())
- .setTableCFsMap(ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs)));
+ .setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs)));
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
index abf2db3..6572404 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
@@ -187,13 +187,13 @@ public class TestPerTableCFReplication {
Map<TableName, List<String>> tabCFsMap = null;
// 1. null or empty string, result should be null
- tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(null);
+ tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(null);
assertEquals(null, tabCFsMap);
- tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("");
+ tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig("");
assertEquals(null, tabCFsMap);
- tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(" ");
+ tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(" ");
assertEquals(null, tabCFsMap);
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
@@ -201,20 +201,20 @@ public class TestPerTableCFReplication {
final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3");
// 2. single table: "tableName1" / "tableName2:cf1" / "tableName3:cf1,cf3"
- tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(tableName1.getNameAsString());
+ tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1.getNameAsString());
assertEquals(1, tabCFsMap.size()); // only one table
assertTrue(tabCFsMap.containsKey(tableName1)); // its table name is "tableName1"
assertFalse(tabCFsMap.containsKey(tableName2)); // not other table
assertEquals(null, tabCFsMap.get(tableName1)); // null cf-list,
- tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(tableName2 + ":cf1");
+ tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName2 + ":cf1");
assertEquals(1, tabCFsMap.size()); // only one table
assertTrue(tabCFsMap.containsKey(tableName2)); // its table name is "tableName2"
assertFalse(tabCFsMap.containsKey(tableName1)); // not other table
assertEquals(1, tabCFsMap.get(tableName2).size()); // cf-list contains only 1 cf
assertEquals("cf1", tabCFsMap.get(tableName2).get(0));// the only cf is "cf1"
- tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(tableName3 + " : cf1 , cf3");
+ tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName3 + " : cf1 , cf3");
assertEquals(1, tabCFsMap.size()); // only one table
assertTrue(tabCFsMap.containsKey(tableName3)); // its table name is "tableName2"
assertFalse(tabCFsMap.containsKey(tableName1)); // not other table
@@ -223,7 +223,7 @@ public class TestPerTableCFReplication {
assertTrue(tabCFsMap.get(tableName3).contains("cf3"));// contains "cf3"
// 3. multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
- tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(tableName1 + " ; " + tableName2
+ tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1 + " ; " + tableName2
+ ":cf1 ; " + tableName3 + ":cf1,cf3");
// 3.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
assertEquals(3, tabCFsMap.size());
@@ -242,7 +242,7 @@ public class TestPerTableCFReplication {
// 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated
// still use the example of multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
- tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(
+ tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
tableName1 + " ; ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,,cf3 ;");
// 4.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
assertEquals(3, tabCFsMap.size());
@@ -261,7 +261,7 @@ public class TestPerTableCFReplication {
// 5. invalid format "tableName1:tt:cf1 ; tableName2::cf1 ; tableName3:cf1,cf3"
// "tableName1:tt:cf1" and "tableName2::cf1" are invalid and will be ignored totally
- tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(
+ tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
tableName1 + ":tt:cf1 ; " + tableName2 + "::cf1 ; " + tableName3 + ":cf1,cf3");
// 5.1 no "tableName1" and "tableName2", only "tableName3"
assertEquals(1, tabCFsMap.size()); // only one table
@@ -281,10 +281,10 @@ public class TestPerTableCFReplication {
Map<TableName, List<String>> tabCFsMap = null;
// 1. null or empty string, result should be null
- assertNull(ReplicationSerDeHelper.convert(tabCFsMap));
+ assertNull(ReplicationPeerConfigUtil.convert(tabCFsMap));
tabCFsMap = new HashMap<>();
- tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+ tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
assertEquals(0, tableCFs.length);
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
@@ -294,7 +294,7 @@ public class TestPerTableCFReplication {
// 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
tabCFsMap.clear();
tabCFsMap.put(tableName1, null);
- tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+ tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
assertEquals(1, tableCFs.length); // only one table
assertEquals(tableName1.toString(),
tableCFs[0].getTableName().getQualifier().toStringUtf8());
@@ -303,7 +303,7 @@ public class TestPerTableCFReplication {
tabCFsMap.clear();
tabCFsMap.put(tableName2, new ArrayList<>());
tabCFsMap.get(tableName2).add("cf1");
- tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+ tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
assertEquals(1, tableCFs.length); // only one table
assertEquals(tableName2.toString(),
tableCFs[0].getTableName().getQualifier().toStringUtf8());
@@ -314,7 +314,7 @@ public class TestPerTableCFReplication {
tabCFsMap.put(tableName3, new ArrayList<>());
tabCFsMap.get(tableName3).add("cf1");
tabCFsMap.get(tableName3).add("cf3");
- tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+ tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
assertEquals(1, tableCFs.length);
assertEquals(tableName3.toString(),
tableCFs[0].getTableName().getQualifier().toStringUtf8());
@@ -330,28 +330,28 @@ public class TestPerTableCFReplication {
tabCFsMap.get(tableName3).add("cf1");
tabCFsMap.get(tableName3).add("cf3");
- tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+ tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
assertEquals(3, tableCFs.length);
- assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, tableName1.toString()));
- assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, tableName2.toString()));
- assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, tableName3.toString()));
+ assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()));
+ assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()));
+ assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()));
assertEquals(0,
- ReplicationSerDeHelper.getTableCF(tableCFs, tableName1.toString()).getFamiliesCount());
+ ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()).getFamiliesCount());
- assertEquals(1,
- ReplicationSerDeHelper.getTableCF(tableCFs, tableName2.toString()).getFamiliesCount());
- assertEquals("cf1",
- ReplicationSerDeHelper.getTableCF(tableCFs, tableName2.toString()).getFamilies(0).toStringUtf8());
+ assertEquals(1, ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString())
+ .getFamiliesCount());
+ assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString())
+ .getFamilies(0).toStringUtf8());
- assertEquals(2,
- ReplicationSerDeHelper.getTableCF(tableCFs, tableName3.toString()).getFamiliesCount());
- assertEquals("cf1",
- ReplicationSerDeHelper.getTableCF(tableCFs, tableName3.toString()).getFamilies(0).toStringUtf8());
- assertEquals("cf3",
- ReplicationSerDeHelper.getTableCF(tableCFs, tableName3.toString()).getFamilies(1).toStringUtf8());
+ assertEquals(2, ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
+ .getFamiliesCount());
+ assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
+ .getFamilies(0).toStringUtf8());
+ assertEquals("cf3", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
+ .getFamilies(1).toStringUtf8());
- tabCFsMap = ReplicationSerDeHelper.convert2Map(tableCFs);
+ tabCFsMap = ReplicationPeerConfigUtil.convert2Map(tableCFs);
assertEquals(3, tabCFsMap.size());
assertTrue(tabCFsMap.containsKey(tableName1));
assertTrue(tabCFsMap.containsKey(tableName2));
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
index 8c604f4..1a02317 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -99,14 +99,15 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(zkw.getQuorum());
String peerNode = getPeerNode(peerId);
- ZKUtil.createWithParents(zkw, peerNode, ReplicationSerDeHelper.toByteArray(rpc));
+ ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
String tableCFs = tableName1 + ":cf1,cf2;" + tableName2 + ":cf3;" + tableName3;
String tableCFsNode = getTableCFsNode(peerId);
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
- ReplicationPeerConfig actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
+ ReplicationPeerConfig actualRpc =
+ ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
String actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
@@ -117,14 +118,14 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
rpc = new ReplicationPeerConfig();
rpc.setClusterKey(zkw.getQuorum());
peerNode = getPeerNode(peerId);
- ZKUtil.createWithParents(zkw, peerNode, ReplicationSerDeHelper.toByteArray(rpc));
+ ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
tableCFs = tableName1 + ":cf1,cf3;" + tableName2 + ":cf2";
tableCFsNode = getTableCFsNode(peerId);
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
- actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
+ actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
@@ -135,14 +136,14 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
rpc = new ReplicationPeerConfig();
rpc.setClusterKey(zkw.getQuorum());
peerNode = getPeerNode(peerId);
- ZKUtil.createWithParents(zkw, peerNode, ReplicationSerDeHelper.toByteArray(rpc));
+ ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
tableCFs = "";
tableCFsNode = getTableCFsNode(peerId);
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
- actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
+ actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
@@ -153,10 +154,10 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
rpc = new ReplicationPeerConfig();
rpc.setClusterKey(zkw.getQuorum());
peerNode = getPeerNode(peerId);
- ZKUtil.createWithParents(zkw, peerNode, ReplicationSerDeHelper.toByteArray(rpc));
+ ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
tableCFsNode = getTableCFsNode(peerId);
- actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
+ actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
@@ -167,7 +168,7 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
peerId = "1";
peerNode = getPeerNode(peerId);
- actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
+ actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
Map<TableName, List<String>> tableNameListMap = actualRpc.getTableCFsMap();
assertEquals(3, tableNameListMap.size());
@@ -184,7 +185,7 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
peerId = "2";
peerNode = getPeerNode(peerId);
- actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
+ actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
tableNameListMap = actualRpc.getTableCFsMap();
assertEquals(2, tableNameListMap.size());
@@ -198,14 +199,14 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
peerId = "3";
peerNode = getPeerNode(peerId);
- actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
+ actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
tableNameListMap = actualRpc.getTableCFsMap();
assertNull(tableNameListMap);
peerId = "4";
peerNode = getPeerNode(peerId);
- actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
+ actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
tableNameListMap = actualRpc.getTableCFsMap();
assertNull(tableNameListMap);
[2/2] hbase git commit: HBASE-19009 implement modifyTable and
enable/disableTableReplication for AsyncAdmin
Posted by zg...@apache.org.
HBASE-19009 implement modifyTable and enable/disableTableReplication for AsyncAdmin
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/600fdee8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/600fdee8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/600fdee8
Branch: refs/heads/master
Commit: 600fdee8449aa1de80c8a78d3bb5e8551d3a0261
Parents: d89682e
Author: Guanghao Zhang <zg...@apache.org>
Authored: Sun Nov 12 20:16:20 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Thu Nov 16 07:07:20 2017 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/client/AsyncAdmin.java | 18 +
.../hadoop/hbase/client/AsyncHBaseAdmin.java | 17 +-
.../hbase/client/ColumnFamilyDescriptor.java | 27 ++
.../apache/hadoop/hbase/client/HBaseAdmin.java | 220 ++-------
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 313 ++++++++++++-
.../hadoop/hbase/client/TableDescriptor.java | 51 +-
.../hbase/client/TableDescriptorBuilder.java | 21 +-
.../client/replication/ReplicationAdmin.java | 8 +-
.../replication/ReplicationPeerConfigUtil.java | 468 +++++++++++++++++++
.../replication/ReplicationSerDeHelper.java | 437 -----------------
.../replication/ReplicationPeerConfig.java | 20 +
.../hbase/shaded/protobuf/RequestConverter.java | 6 +-
.../replication/ReplicationPeerZKImpl.java | 6 +-
.../replication/ReplicationPeersZKImpl.java | 14 +-
.../hadoop/hbase/master/MasterRpcServices.java | 10 +-
.../replication/master/TableCFsUpdater.java | 14 +-
.../client/TestAsyncReplicationAdminApi.java | 2 -
...estAsyncReplicationAdminApiWithClusters.java | 242 ++++++++++
.../replication/TestReplicationAdmin.java | 16 +-
.../replication/TestMasterReplication.java | 4 +-
.../replication/TestPerTableCFReplication.java | 62 +--
.../replication/master/TestTableCFsUpdater.java | 27 +-
22 files changed, 1261 insertions(+), 742 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index f251a8f..722e8b5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -141,6 +141,12 @@ public interface AsyncAdmin {
*/
CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys);
+ /*
+ * Modify an existing table, more IRB friendly version.
+ * @param desc modified description of the table
+ */
+ CompletableFuture<Void> modifyTable(TableDescriptor desc);
+
/**
* Deletes a table.
* @param tableName name of table to delete
@@ -553,6 +559,18 @@ public interface AsyncAdmin {
CompletableFuture<List<TableCFs>> listReplicatedTableCFs();
/**
+ * Enable a table's replication switch.
+ * @param tableName name of the table
+ */
+ CompletableFuture<Void> enableTableReplication(TableName tableName);
+
+ /**
+ * Disable a table's replication switch.
+ * @param tableName name of the table
+ */
+ CompletableFuture<Void> disableTableReplication(TableName tableName);
+
+ /**
* Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be
* taken. If the table is disabled, an offline snapshot is taken. Snapshots are considered unique
* based on <b>the name of the snapshot</b>. Attempts to take a snapshot with the same name (even
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 250a38c..5a20291 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -128,6 +128,11 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
}
@Override
+ public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
+ return wrap(rawAdmin.modifyTable(desc));
+ }
+
+ @Override
public CompletableFuture<Void> deleteTable(TableName tableName) {
return wrap(rawAdmin.deleteTable(tableName));
}
@@ -420,6 +425,16 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
}
@Override
+ public CompletableFuture<Void> enableTableReplication(TableName tableName) {
+ return wrap(rawAdmin.enableTableReplication(tableName));
+ }
+
+ @Override
+ public CompletableFuture<Void> disableTableReplication(TableName tableName) {
+ return wrap(rawAdmin.disableTableReplication(tableName));
+ }
+
+ @Override
public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
return wrap(rawAdmin.snapshot(snapshot));
}
@@ -709,4 +724,4 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
return wrap(rawAdmin.clearDeadServers(servers));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java
index c232271..03f4582 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.client;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.Map;
+
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.yetus.audience.InterfaceAudience;
@@ -54,6 +56,31 @@ public interface ColumnFamilyDescriptor {
return lhs.getConfiguration().hashCode() - rhs.getConfiguration().hashCode();
};
+ static final Bytes REPLICATION_SCOPE_BYTES = new Bytes(
+ Bytes.toBytes(ColumnFamilyDescriptorBuilder.REPLICATION_SCOPE));
+
+ @InterfaceAudience.Private
+ static final Comparator<ColumnFamilyDescriptor> COMPARATOR_IGNORE_REPLICATION = (
+ ColumnFamilyDescriptor lcf, ColumnFamilyDescriptor rcf) -> {
+ int result = Bytes.compareTo(lcf.getName(), rcf.getName());
+ if (result != 0) {
+ return result;
+ }
+ // ColumnFamilyDescriptor.getValues is a immutable map, so copy it and remove
+ // REPLICATION_SCOPE_BYTES
+ Map<Bytes, Bytes> lValues = new HashMap<>();
+ lValues.putAll(lcf.getValues());
+ lValues.remove(REPLICATION_SCOPE_BYTES);
+ Map<Bytes, Bytes> rValues = new HashMap<>();
+ rValues.putAll(rcf.getValues());
+ rValues.remove(REPLICATION_SCOPE_BYTES);
+ result = lValues.hashCode() - rValues.hashCode();
+ if (result != 0) {
+ return result;
+ }
+ return lcf.getConfiguration().hashCode() - rcf.getConfiguration().hashCode();
+ };
+
/**
* @return The storefile/hfile blocksize for this column family.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 80f9d16..e153381 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -30,7 +30,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -45,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -54,10 +54,8 @@ import org.apache.hadoop.hbase.CacheEvictionStats;
import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.ClusterStatus.Option;
-import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -76,7 +74,7 @@ import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.client.replication.TableCFs;
import org.apache.hadoop.hbase.client.security.SecurityCapability;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
@@ -3893,7 +3891,7 @@ public class HBaseAdmin implements Admin {
protected ReplicationPeerConfig rpcCall() throws Exception {
GetReplicationPeerConfigResponse response = master.getReplicationPeerConfig(
getRpcController(), RequestConverter.buildGetReplicationPeerConfigRequest(peerId));
- return ReplicationSerDeHelper.convert(response.getPeerConfig());
+ return ReplicationPeerConfigUtil.convert(response.getPeerConfig());
}
});
}
@@ -3919,7 +3917,7 @@ public class HBaseAdmin implements Admin {
throw new ReplicationException("tableCfs is null");
}
ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
- ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
+ ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
updateReplicationPeerConfig(id, peerConfig);
}
@@ -3931,7 +3929,7 @@ public class HBaseAdmin implements Admin {
throw new ReplicationException("tableCfs is null");
}
ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
- ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
+ ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
updateReplicationPeerConfig(id, peerConfig);
}
@@ -3957,7 +3955,7 @@ public class HBaseAdmin implements Admin {
.getPeerDescList();
List<ReplicationPeerDescription> result = new ArrayList<>(peersList.size());
for (ReplicationProtos.ReplicationPeerDescription peer : peersList) {
- result.add(ReplicationSerDeHelper.toReplicationPeerDescription(peer));
+ result.add(ReplicationPeerConfigUtil.toReplicationPeerDescription(peer));
}
return result;
}
@@ -4010,19 +4008,18 @@ public class HBaseAdmin implements Admin {
@Override
public List<TableCFs> listReplicatedTableCFs() throws IOException {
List<TableCFs> replicatedTableCFs = new ArrayList<>();
- HTableDescriptor[] tables = listTables();
- for (HTableDescriptor table : tables) {
- HColumnDescriptor[] columns = table.getColumnFamilies();
+ List<TableDescriptor> tables = listTableDescriptors();
+ tables.forEach(table -> {
Map<String, Integer> cfs = new HashMap<>();
- for (HColumnDescriptor column : columns) {
- if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
- cfs.put(column.getNameAsString(), column.getScope());
- }
- }
+ Stream.of(table.getColumnFamilies())
+ .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
+ .forEach(column -> {
+ cfs.put(column.getNameAsString(), column.getScope());
+ });
if (!cfs.isEmpty()) {
replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
}
- }
+ });
return replicatedTableCFs;
}
@@ -4046,84 +4043,13 @@ public class HBaseAdmin implements Admin {
throw new IllegalArgumentException("Table name is null");
}
if (!tableExists(tableName)) {
- throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
+ throw new TableNotFoundException("Table '" + tableName.getNameAsString()
+ "' does not exists.");
}
setTableRep(tableName, false);
}
/**
- * Copies the REPLICATION_SCOPE of table descriptor passed as an argument. Before copy, the method
- * ensures that the name of table and column-families should match.
- * @param peerHtd descriptor on peer cluster
- * @param localHtd - The HTableDescriptor of table from source cluster.
- * @return true If the name of table and column families match and REPLICATION_SCOPE copied
- * successfully. false If there is any mismatch in the names.
- */
- private boolean copyReplicationScope(final HTableDescriptor peerHtd,
- final HTableDescriptor localHtd) {
- // Copy the REPLICATION_SCOPE only when table names and the names of
- // Column-Families are same.
- int result = peerHtd.getTableName().compareTo(localHtd.getTableName());
-
- if (result == 0) {
- Iterator<HColumnDescriptor> remoteHCDIter = peerHtd.getFamilies().iterator();
- Iterator<HColumnDescriptor> localHCDIter = localHtd.getFamilies().iterator();
-
- while (remoteHCDIter.hasNext() && localHCDIter.hasNext()) {
- HColumnDescriptor remoteHCD = remoteHCDIter.next();
- HColumnDescriptor localHCD = localHCDIter.next();
-
- String remoteHCDName = remoteHCD.getNameAsString();
- String localHCDName = localHCD.getNameAsString();
-
- if (remoteHCDName.equals(localHCDName)) {
- remoteHCD.setScope(localHCD.getScope());
- } else {
- result = -1;
- break;
- }
- }
- if (remoteHCDIter.hasNext() || localHCDIter.hasNext()) {
- return false;
- }
- }
-
- return result == 0;
- }
-
- /**
- * Compare the contents of the descriptor with another one passed as a parameter for replication
- * purpose. The REPLICATION_SCOPE field is ignored during comparison.
- * @param peerHtd descriptor on peer cluster
- * @param localHtd descriptor on source cluster which needs to be replicated.
- * @return true if the contents of the two descriptors match (ignoring just REPLICATION_SCOPE).
- * @see java.lang.Object#equals(java.lang.Object)
- */
- private boolean compareForReplication(HTableDescriptor peerHtd, HTableDescriptor localHtd) {
- if (peerHtd == localHtd) {
- return true;
- }
- if (peerHtd == null) {
- return false;
- }
- boolean result = false;
-
- // Create a copy of peer HTD as we need to change its replication
- // scope to match with the local HTD.
- HTableDescriptor peerHtdCopy = new HTableDescriptor(peerHtd);
-
- result = copyReplicationScope(peerHtdCopy, localHtd);
-
- // If copy was successful, compare the two tables now.
- if (result) {
- result = (peerHtdCopy.compareTo(localHtd) == 0);
- }
-
- return result;
- }
-
- /**
* Connect to peer and check the table descriptor on peer:
* <ol>
* <li>Create the same table on peer when not exist.</li>
@@ -4143,21 +4069,23 @@ public class HBaseAdmin implements Admin {
}
for (ReplicationPeerDescription peerDesc : peers) {
- if (needToReplicate(tableName, peerDesc)) {
- Configuration peerConf = getPeerClusterConfiguration(peerDesc);
+ if (peerDesc.getPeerConfig().needToReplicate(tableName)) {
+ Configuration peerConf =
+ ReplicationPeerConfigUtil.getPeerClusterConfiguration(this.conf, peerDesc);
try (Connection conn = ConnectionFactory.createConnection(peerConf);
Admin repHBaseAdmin = conn.getAdmin()) {
- HTableDescriptor localHtd = getTableDescriptor(tableName);
- HTableDescriptor peerHtd = null;
+ TableDescriptor tableDesc = getDescriptor(tableName);
+ TableDescriptor peerTableDesc = null;
if (!repHBaseAdmin.tableExists(tableName)) {
- repHBaseAdmin.createTable(localHtd, splits);
+ repHBaseAdmin.createTable(tableDesc, splits);
} else {
- peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
- if (peerHtd == null) {
+ peerTableDesc = repHBaseAdmin.getDescriptor(tableName);
+ if (peerTableDesc == null) {
throw new IllegalArgumentException("Failed to get table descriptor for table "
+ tableName.getNameAsString() + " from peer cluster " + peerDesc.getPeerId());
}
- if (!compareForReplication(peerHtd, localHtd)) {
+ if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc,
+ tableDesc) != 0) {
throw new IllegalArgumentException("Table " + tableName.getNameAsString()
+ " exists in peer cluster " + peerDesc.getPeerId()
+ ", but the table descriptors are not same when compared with source cluster."
@@ -4170,108 +4098,20 @@ public class HBaseAdmin implements Admin {
}
/**
- * Decide whether the table need replicate to the peer cluster according to the peer config
- * @param table name of the table
- * @param peer config for the peer
- * @return true if the table need replicate to the peer cluster
- */
- private boolean needToReplicate(TableName table, ReplicationPeerDescription peer) {
- ReplicationPeerConfig peerConfig = peer.getPeerConfig();
- Set<String> namespaces = peerConfig.getNamespaces();
- Map<TableName, List<String>> tableCFsMap = peerConfig.getTableCFsMap();
- // If null means user has explicitly not configured any namespaces and table CFs
- // so all the tables data are applicable for replication
- if (namespaces == null && tableCFsMap == null) {
- return true;
- }
- if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) {
- return true;
- }
- if (tableCFsMap != null && tableCFsMap.containsKey(table)) {
- return true;
- }
- LOG.debug("Table " + table.getNameAsString()
- + " doesn't need replicate to peer cluster, peerId=" + peer.getPeerId() + ", clusterKey="
- + peerConfig.getClusterKey());
- return false;
- }
-
- /**
* Set the table's replication switch if the table's replication switch is already not set.
* @param tableName name of the table
* @param enableRep is replication switch enable or disable
* @throws IOException if a remote or network exception occurs
*/
private void setTableRep(final TableName tableName, boolean enableRep) throws IOException {
- HTableDescriptor htd = new HTableDescriptor(getTableDescriptor(tableName));
- ReplicationState currentReplicationState = getTableReplicationState(htd);
- if (enableRep && currentReplicationState != ReplicationState.ENABLED
- || !enableRep && currentReplicationState != ReplicationState.DISABLED) {
- for (HColumnDescriptor hcd : htd.getFamilies()) {
- hcd.setScope(enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL
- : HConstants.REPLICATION_SCOPE_LOCAL);
- }
- modifyTable(tableName, htd);
+ TableDescriptor tableDesc = getDescriptor(tableName);
+ if (!tableDesc.matchReplicationScope(enableRep)) {
+ int scope =
+ enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
+ modifyTable(TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build());
}
}
- /**
- * This enum indicates the current state of the replication for a given table.
- */
- private enum ReplicationState {
- ENABLED, // all column families enabled
- MIXED, // some column families enabled, some disabled
- DISABLED // all column families disabled
- }
-
- /**
- * @param htd table descriptor details for the table to check
- * @return ReplicationState the current state of the table.
- */
- private ReplicationState getTableReplicationState(HTableDescriptor htd) {
- boolean hasEnabled = false;
- boolean hasDisabled = false;
-
- for (HColumnDescriptor hcd : htd.getFamilies()) {
- if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
- && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
- hasDisabled = true;
- } else {
- hasEnabled = true;
- }
- }
-
- if (hasEnabled && hasDisabled) return ReplicationState.MIXED;
- if (hasEnabled) return ReplicationState.ENABLED;
- return ReplicationState.DISABLED;
- }
-
- /**
- * Returns the configuration needed to talk to the remote slave cluster.
- * @param peer the description of replication peer
- * @return the configuration for the peer cluster, null if it was unable to get the configuration
- * @throws IOException
- */
- private Configuration getPeerClusterConfiguration(ReplicationPeerDescription peer)
- throws IOException {
- ReplicationPeerConfig peerConfig = peer.getPeerConfig();
- Configuration otherConf;
- try {
- otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
- } catch (IOException e) {
- throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e);
- }
-
- if (!peerConfig.getConfiguration().isEmpty()) {
- CompoundConfiguration compound = new CompoundConfiguration();
- compound.add(otherConf);
- compound.addStringMap(peerConfig.getConfiguration());
- return compound;
- }
-
- return otherConf;
- }
-
@Override
public void clearCompactionQueues(final ServerName sn, final Set<String> queues)
throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index d77cd15..bcf581b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -42,6 +42,7 @@ import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.ClusterStatus.Option;
@@ -64,7 +65,7 @@ import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterReques
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
import org.apache.hadoop.hbase.client.Scan.ReadType;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.client.replication.TableCFs;
import org.apache.hadoop.hbase.client.security.SecurityCapability;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -188,6 +189,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColu
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
@@ -506,6 +509,14 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
}
@Override
+ public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
+ return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(
+ RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
+ ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
+ (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
+ }
+
+ @Override
public CompletableFuture<Void> deleteTable(TableName tableName) {
return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter
.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
@@ -1515,7 +1526,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
.<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), (
s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
- (resp) -> ReplicationSerDeHelper.convert(resp.getPeerConfig()))).call();
+ (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
}
@Override
@@ -1541,7 +1552,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
CompletableFuture<Void> future = new CompletableFuture<Void>();
getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
if (!completeExceptionally(future, error)) {
- ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
+ ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
if (!completeExceptionally(future, error)) {
future.complete(result);
@@ -1560,21 +1571,23 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
}
CompletableFuture<Void> future = new CompletableFuture<Void>();
- getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
- if (!completeExceptionally(future, error)) {
- try {
- ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
- } catch (ReplicationException e) {
- future.completeExceptionally(e);
- return;
- }
- updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
- if (!completeExceptionally(future, error)) {
- future.complete(result);
+ getReplicationPeerConfig(id).whenComplete(
+ (peerConfig, error) -> {
+ if (!completeExceptionally(future, error)) {
+ try {
+ ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig,
+ id);
+ } catch (ReplicationException e) {
+ future.completeExceptionally(e);
+ return;
}
- });
- }
- });
+ updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
+ if (!completeExceptionally(future, error)) {
+ future.complete(result);
+ }
+ });
+ }
+ });
return future;
}
@@ -1602,7 +1615,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
request,
(s, c, req, done) -> s.listReplicationPeers(c, req, done),
(resp) -> resp.getPeerDescList().stream()
- .map(ReplicationSerDeHelper::toReplicationPeerDescription)
+ .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
.collect(Collectors.toList()))).call();
}
@@ -2168,9 +2181,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
returnedFuture.completeExceptionally(err);
return;
}
- LOG.info("location is " + location);
if (!location.isPresent() || location.get().getRegion() == null) {
- LOG.info("unknown location is " + location);
returnedFuture.completeExceptionally(new UnknownRegionException(
"Invalid region name or encoded region name: "
+ Bytes.toStringBinary(regionNameOrEncodedRegionName)));
@@ -2323,6 +2334,18 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
+ private class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
+
+ ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
+ super(tableName);
+ }
+
+ @Override
+ String getOperationType() {
+ return "ENABLE";
+ }
+ }
+
private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
DeleteTableProcedureBiConsumer(TableName tableName) {
@@ -3031,4 +3054,254 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
.pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt);
}
+
+ @Override
+ public CompletableFuture<Void> enableTableReplication(TableName tableName) {
+ if (tableName == null) {
+ return failedFuture(new IllegalArgumentException("Table name is null"));
+ }
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ tableExists(tableName).whenComplete(
+ (exist, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ if (!exist) {
+ future.completeExceptionally(new TableNotFoundException("Table '"
+ + tableName.getNameAsString() + "' does not exists."));
+ return;
+ }
+ getTableSplits(tableName).whenComplete((splits, err1) -> {
+ if (err1 != null) {
+ future.completeExceptionally(err1);
+ } else {
+ checkAndSyncTableToPeerClusters(tableName, splits).whenComplete((result, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ setTableReplication(tableName, true).whenComplete((result3, err3) -> {
+ if (err3 != null) {
+ future.completeExceptionally(err3);
+ } else {
+ future.complete(result3);
+ }
+ });
+ }
+ });
+ }
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> disableTableReplication(TableName tableName) {
+ if (tableName == null) {
+ return failedFuture(new IllegalArgumentException("Table name is null"));
+ }
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ tableExists(tableName).whenComplete(
+ (exist, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ if (!exist) {
+ future.completeExceptionally(new TableNotFoundException("Table '"
+ + tableName.getNameAsString() + "' does not exists."));
+ return;
+ }
+ setTableReplication(tableName, false).whenComplete((result, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(result);
+ }
+ });
+ });
+ return future;
+ }
+
+ private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
+ CompletableFuture<byte[][]> future = new CompletableFuture<>();
+ getTableRegions(tableName).whenComplete((regions, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ return;
+ }
+ if (regions.size() == 1) {
+ future.complete(null);
+ } else {
+ byte[][] splits = new byte[regions.size() - 1][];
+ for (int i = 1; i < regions.size(); i++) {
+ splits[i - 1] = regions.get(i).getStartKey();
+ }
+ future.complete(splits);
+ }
+ });
+ return future;
+ }
+
+ /**
+ * Connect to peer and check the table descriptor on peer:
+ * <ol>
+ * <li>Create the same table on peer when not exist.</li>
+ * <li>Throw an exception if the table already has replication enabled on any of the column
+ * families.</li>
+ * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
+ * </ol>
+ * @param tableName name of the table to sync to the peer
+ * @param splits table split keys
+ */
+ private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
+ byte[][] splits) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ listReplicationPeers().whenComplete(
+ (peers, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ if (peers == null || peers.size() <= 0) {
+ future.completeExceptionally(new IllegalArgumentException(
+ "Found no peer cluster for replication."));
+ return;
+ }
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
+ .forEach(peer -> {
+ futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
+ });
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
+ .whenComplete((result, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(result);
+ }
+ });
+ });
+ return future;
+ }
+
+ private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
+ ReplicationPeerDescription peer) {
+ Configuration peerConf = null;
+ try {
+ peerConf =
+ ReplicationPeerConfigUtil
+ .getPeerClusterConfiguration(connection.getConfiguration(), peer);
+ } catch (IOException e) {
+ return failedFuture(e);
+ }
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ ConnectionFactory.createAsyncConnection(peerConf).whenComplete(
+ (conn, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ getTableDescriptor(tableName).whenComplete(
+ (tableDesc, err1) -> {
+ if (err1 != null) {
+ future.completeExceptionally(err1);
+ return;
+ }
+ AsyncAdmin peerAdmin = conn.getAdmin();
+ peerAdmin.tableExists(tableName).whenComplete(
+ (exist, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ return;
+ }
+ if (!exist) {
+ CompletableFuture<Void> createTableFuture = null;
+ if (splits == null) {
+ createTableFuture = peerAdmin.createTable(tableDesc);
+ } else {
+ createTableFuture = peerAdmin.createTable(tableDesc, splits);
+ }
+ createTableFuture.whenComplete(
+ (result, err3) -> {
+ if (err3 != null) {
+ future.completeExceptionally(err3);
+ } else {
+ future.complete(result);
+ }
+ });
+ } else {
+ compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin).whenComplete(
+ (result, err4) -> {
+ if (err4 != null) {
+ future.completeExceptionally(err4);
+ } else {
+ future.complete(result);
+ }
+ });
+ }
+ });
+ });
+ });
+ return future;
+ }
+
+ private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
+ TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ peerAdmin.getTableDescriptor(tableName).whenComplete(
+ (peerTableDesc, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ if (peerTableDesc == null) {
+ future.completeExceptionally(new IllegalArgumentException(
+ "Failed to get table descriptor for table " + tableName.getNameAsString()
+ + " from peer cluster " + peer.getPeerId()));
+ return;
+ }
+ if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
+ future.completeExceptionally(new IllegalArgumentException("Table "
+ + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId()
+ + ", but the table descriptors are not same when compared with source cluster."
+ + " Thus can not enable the table's replication switch."));
+ return;
+ }
+ future.complete(null);
+ });
+ return future;
+ }
+
+ /**
+ * Set the table's replication switch if the table's replication switch is already not set.
+ * @param tableName name of the table
+ * @param enableRep is replication switch enable or disable
+ */
+ private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getTableDescriptor(tableName).whenComplete(
+ (tableDesc, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ if (!tableDesc.matchReplicationScope(enableRep)) {
+ int scope =
+ enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
+ TableDescriptor newTableDesc =
+ TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
+ modifyTable(newTableDesc).whenComplete((result, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(result);
+ }
+ });
+ } else {
+ future.complete(null);
+ }
+ });
+ return future;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
index 4e2deed..f485c4e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
@@ -24,10 +24,11 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
-
+import org.apache.yetus.audience.InterfaceAudience;
/**
* TableDescriptor contains the details about an HBase table such as the descriptors of
@@ -39,8 +40,15 @@ import org.apache.hadoop.hbase.util.Bytes;
public interface TableDescriptor {
@InterfaceAudience.Private
- static final Comparator<TableDescriptor> COMPARATOR
- = (TableDescriptor lhs, TableDescriptor rhs) -> {
+ Comparator<TableDescriptor> COMPARATOR = getComparator(ColumnFamilyDescriptor.COMPARATOR);
+
+ @InterfaceAudience.Private
+ Comparator<TableDescriptor> COMPARATOR_IGNORE_REPLICATION =
+ getComparator(ColumnFamilyDescriptor.COMPARATOR_IGNORE_REPLICATION);
+
+ static Comparator<TableDescriptor>
+ getComparator(Comparator<ColumnFamilyDescriptor> cfComparator) {
+ return (TableDescriptor lhs, TableDescriptor rhs) -> {
int result = lhs.getTableName().compareTo(rhs.getTableName());
if (result != 0) {
return result;
@@ -52,16 +60,17 @@ public interface TableDescriptor {
return result;
}
- for (Iterator<ColumnFamilyDescriptor> it = lhsFamilies.iterator(),
- it2 = rhsFamilies.iterator(); it.hasNext();) {
- result = ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next());
+ for (Iterator<ColumnFamilyDescriptor> it = lhsFamilies.iterator(), it2 =
+ rhsFamilies.iterator(); it.hasNext();) {
+ result = cfComparator.compare(it.next(), it2.next());
if (result != 0) {
return result;
}
}
// punt on comparison for ordering, just calculate difference
return Integer.compare(lhs.getValues().hashCode(), rhs.getValues().hashCode());
- };
+ };
+ }
/**
* Returns the count of the column families of the table.
@@ -266,4 +275,30 @@ public interface TableDescriptor {
*/
boolean isReadOnly();
+ /**
+ * Check if the table's cfs' replication scope matched with the replication state
+ * @param enabled replication state
+ * @return true if matched, otherwise false
+ */
+ default boolean matchReplicationScope(boolean enabled) {
+ boolean hasEnabled = false;
+ boolean hasDisabled = false;
+
+ for (ColumnFamilyDescriptor cf : getColumnFamilies()) {
+ if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
+ && cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
+ hasDisabled = true;
+ } else {
+ hasEnabled = true;
+ }
+ }
+
+ if (hasEnabled && hasDisabled) {
+ return false;
+ }
+ if (hasEnabled) {
+ return enabled;
+ }
+ return !enabled;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
index 7bde1c1..ef59311 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
@@ -33,18 +33,19 @@ import java.util.TreeSet;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.stream.Stream;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* @since 2.0.0
@@ -409,6 +410,24 @@ public class TableDescriptorBuilder {
return this;
}
+ /**
+ * Sets replication scope all & only the columns already in the builder. Columns added later won't
+ * be backfilled with replication scope.
+ * @param scope replication scope
+ * @return a TableDescriptorBuilder
+ */
+ public TableDescriptorBuilder setReplicationScope(int scope) {
+ Map<byte[], ColumnFamilyDescriptor> newFamilies = new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
+ newFamilies.putAll(desc.families);
+ newFamilies
+ .forEach((cf, cfDesc) -> {
+ desc.removeColumnFamily(cf);
+ desc.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cfDesc).setScope(scope)
+ .build());
+ });
+ return this;
+ }
+
public TableDescriptor build() {
return new ModifyableTableDescriptor(desc);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 39f2045..5a5913c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -141,7 +141,7 @@ public class ReplicationAdmin implements Closeable {
* */
@Deprecated
public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
- return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig);
+ return ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCFsConfig);
}
/**
@@ -228,7 +228,7 @@ public class ReplicationAdmin implements Closeable {
@Deprecated
public String getPeerTableCFs(String id) throws IOException {
ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(id);
- return ReplicationSerDeHelper.convertToString(peerConfig.getTableCFsMap());
+ return ReplicationPeerConfigUtil.convertToString(peerConfig.getTableCFsMap());
}
/**
@@ -243,7 +243,7 @@ public class ReplicationAdmin implements Closeable {
@Deprecated
public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException,
IOException {
- appendPeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs));
+ appendPeerTableCFs(id, ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs));
}
/**
@@ -300,7 +300,7 @@ public class ReplicationAdmin implements Closeable {
@Deprecated
public void removePeerTableCFs(String id, String tableCf) throws ReplicationException,
IOException {
- removePeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCf));
+ removePeerTableCFs(id, ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCf));
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
new file mode 100644
index 0000000..be468ae
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
@@ -0,0 +1,468 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.replication;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CompoundConfiguration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Set;
+
+/**
+ * Helper for TableCFs Operations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public final class ReplicationPeerConfigUtil {
+
+ private static final Log LOG = LogFactory.getLog(ReplicationPeerConfigUtil.class);
+
+ private ReplicationPeerConfigUtil() {}
+
+ public static String convertToString(Set<String> namespaces) {
+ if (namespaces == null) {
+ return null;
+ }
+ return StringUtils.join(namespaces, ';');
+ }
+
+ /** convert map to TableCFs Object */
+ public static ReplicationProtos.TableCF[] convert(
+ Map<TableName, ? extends Collection<String>> tableCfs) {
+ if (tableCfs == null) {
+ return null;
+ }
+ List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size());
+ ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
+ for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
+ tableCFBuilder.clear();
+ tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey()));
+ Collection<String> v = entry.getValue();
+ if (v != null && !v.isEmpty()) {
+ for (String value : entry.getValue()) {
+ tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value));
+ }
+ }
+ tableCFList.add(tableCFBuilder.build());
+ }
+ return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
+ }
+
+ public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) {
+ if (tableCfs == null) {
+ return null;
+ }
+ return convert(convert(tableCfs));
+ }
+
+ /**
+ * Convert string to TableCFs Object.
+ * This is only for read TableCFs information from TableCF node.
+ * Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3.
+ * */
+ public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) {
+ if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
+ return null;
+ }
+
+ ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
+ String[] tables = tableCFsConfig.split(";");
+ List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.length);
+
+ for (String tab : tables) {
+ // 1 ignore empty table config
+ tab = tab.trim();
+ if (tab.length() == 0) {
+ continue;
+ }
+ // 2 split to "table" and "cf1,cf2"
+ // for each table: "table#cf1,cf2" or "table"
+ String[] pair = tab.split(":");
+ String tabName = pair[0].trim();
+ if (pair.length > 2 || tabName.length() == 0) {
+ LOG.info("incorrect format:" + tableCFsConfig);
+ continue;
+ }
+
+ tableCFBuilder.clear();
+ // split namespace from tableName
+ String ns = "default";
+ String tName = tabName;
+ String[] dbs = tabName.split("\\.");
+ if (dbs != null && dbs.length == 2) {
+ ns = dbs[0];
+ tName = dbs[1];
+ }
+ tableCFBuilder.setTableName(
+ ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName)));
+
+ // 3 parse "cf1,cf2" part to List<cf>
+ if (pair.length == 2) {
+ String[] cfsList = pair[1].split(",");
+ for (String cf : cfsList) {
+ String cfName = cf.trim();
+ if (cfName.length() > 0) {
+ tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName));
+ }
+ }
+ }
+ tableCFList.add(tableCFBuilder.build());
+ }
+ return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
+ }
+
+ /**
+ * Convert TableCFs Object to String.
+ * Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3
+ * */
+ public static String convert(ReplicationProtos.TableCF[] tableCFs) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0, n = tableCFs.length; i < n; i++) {
+ ReplicationProtos.TableCF tableCF = tableCFs[i];
+ String namespace = tableCF.getTableName().getNamespace().toStringUtf8();
+ if (!Strings.isEmpty(namespace)) {
+ sb.append(namespace).append(".").
+ append(tableCF.getTableName().getQualifier().toStringUtf8())
+ .append(":");
+ } else {
+ sb.append(tableCF.getTableName().toString()).append(":");
+ }
+ for (int j = 0; j < tableCF.getFamiliesCount(); j++) {
+ sb.append(tableCF.getFamilies(j).toStringUtf8()).append(",");
+ }
+ sb.deleteCharAt(sb.length() - 1).append(";");
+ }
+ if (sb.length() > 0) {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Get TableCF in TableCFs, if not exist, return null.
+ * */
+ public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs,
+ String table) {
+ for (int i = 0, n = tableCFs.length; i < n; i++) {
+ ReplicationProtos.TableCF tableCF = tableCFs[i];
+ if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) {
+ return tableCF;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Parse bytes into TableCFs.
+ * It is used for backward compatibility.
+ * Old format bytes have no PB_MAGIC Header
+ * */
+ public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException {
+ if (bytes == null) {
+ return null;
+ }
+ return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes));
+ }
+
+ /**
+ * Convert tableCFs string into Map.
+ * */
+ public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
+ ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig);
+ return convert2Map(tableCFs);
+ }
+
+ /**
+ * Convert tableCFs Object to Map.
+ * */
+ public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) {
+ if (tableCFs == null || tableCFs.length == 0) {
+ return null;
+ }
+ Map<TableName, List<String>> tableCFsMap = new HashMap<>();
+ for (int i = 0, n = tableCFs.length; i < n; i++) {
+ ReplicationProtos.TableCF tableCF = tableCFs[i];
+ List<String> families = new ArrayList<>();
+ for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) {
+ families.add(tableCF.getFamilies(j).toStringUtf8());
+ }
+ if (families.size() > 0) {
+ tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families);
+ } else {
+ tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null);
+ }
+ }
+
+ return tableCFsMap;
+ }
+
+ /**
+ * @param bytes Content of a peer znode.
+ * @return ClusterKey parsed from the passed bytes.
+ * @throws DeserializationException
+ */
+ public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
+ throws DeserializationException {
+ if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+ int pblen = ProtobufUtil.lengthOfPBMagic();
+ ReplicationProtos.ReplicationPeer.Builder builder =
+ ReplicationProtos.ReplicationPeer.newBuilder();
+ ReplicationProtos.ReplicationPeer peer;
+ try {
+ ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
+ peer = builder.build();
+ } catch (IOException e) {
+ throw new DeserializationException(e);
+ }
+ return convert(peer);
+ } else {
+ if (bytes.length > 0) {
+ return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
+ }
+ return new ReplicationPeerConfig().setClusterKey("");
+ }
+ }
+
+ public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) {
+ ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
+ if (peer.hasClusterkey()) {
+ peerConfig.setClusterKey(peer.getClusterkey());
+ }
+ if (peer.hasReplicationEndpointImpl()) {
+ peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
+ }
+
+ for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) {
+ peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
+ }
+
+ for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
+ peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
+ }
+
+ Map<TableName, ? extends Collection<String>> tableCFsMap = convert2Map(
+ peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()]));
+ if (tableCFsMap != null) {
+ peerConfig.setTableCFsMap(tableCFsMap);
+ }
+ List<ByteString> namespacesList = peer.getNamespacesList();
+ if (namespacesList != null && namespacesList.size() != 0) {
+ Set<String> namespaces = new HashSet<>();
+ for (ByteString namespace : namespacesList) {
+ namespaces.add(namespace.toStringUtf8());
+ }
+ peerConfig.setNamespaces(namespaces);
+ }
+ if (peer.hasBandwidth()) {
+ peerConfig.setBandwidth(peer.getBandwidth());
+ }
+ return peerConfig;
+ }
+
+ public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
+ ReplicationProtos.ReplicationPeer.Builder builder =
+ ReplicationProtos.ReplicationPeer.newBuilder();
+ if (peerConfig.getClusterKey() != null) {
+ builder.setClusterkey(peerConfig.getClusterKey());
+ }
+ if (peerConfig.getReplicationEndpointImpl() != null) {
+ builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
+ }
+
+ for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
+ builder.addData(HBaseProtos.BytesBytesPair.newBuilder()
+ .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey()))
+ .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue()))
+ .build());
+ }
+
+ for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
+ builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder()
+ .setName(entry.getKey())
+ .setValue(entry.getValue())
+ .build());
+ }
+
+ ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
+ if (tableCFs != null) {
+ for (int i = 0; i < tableCFs.length; i++) {
+ builder.addTableCfs(tableCFs[i]);
+ }
+ }
+ Set<String> namespaces = peerConfig.getNamespaces();
+ if (namespaces != null) {
+ for (String namespace : namespaces) {
+ builder.addNamespaces(ByteString.copyFromUtf8(namespace));
+ }
+ }
+
+ builder.setBandwidth(peerConfig.getBandwidth());
+ return builder.build();
+ }
+
+ /**
+ * @param peerConfig
+ * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
+ * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
+ * /hbase/replication/peers/PEER_ID
+ */
+ public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
+ byte[] bytes = convert(peerConfig).toByteArray();
+ return ProtobufUtil.prependPBMagic(bytes);
+ }
+
+ public static ReplicationPeerDescription toReplicationPeerDescription(
+ ReplicationProtos.ReplicationPeerDescription desc) {
+ boolean enabled = ReplicationProtos.ReplicationState.State.ENABLED == desc.getState()
+ .getState();
+ ReplicationPeerConfig config = convert(desc.getConfig());
+ return new ReplicationPeerDescription(desc.getId(), enabled, config);
+ }
+
+ public static ReplicationProtos.ReplicationPeerDescription toProtoReplicationPeerDescription(
+ ReplicationPeerDescription desc) {
+ ReplicationProtos.ReplicationPeerDescription.Builder builder =
+ ReplicationProtos.ReplicationPeerDescription.newBuilder();
+ builder.setId(desc.getPeerId());
+ ReplicationProtos.ReplicationState.Builder stateBuilder = ReplicationProtos.ReplicationState
+ .newBuilder();
+ stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED
+ : ReplicationProtos.ReplicationState.State.DISABLED);
+ builder.setState(stateBuilder.build());
+ builder.setConfig(convert(desc.getPeerConfig()));
+ return builder.build();
+ }
+
+ public static void appendTableCFsToReplicationPeerConfig(
+ Map<TableName, ? extends Collection<String>> tableCfs, ReplicationPeerConfig peerConfig) {
+ Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
+ if (preTableCfs == null) {
+ peerConfig.setTableCFsMap(tableCfs);
+ } else {
+ for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
+ TableName table = entry.getKey();
+ Collection<String> appendCfs = entry.getValue();
+ if (preTableCfs.containsKey(table)) {
+ List<String> cfs = preTableCfs.get(table);
+ if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
+ preTableCfs.put(table, null);
+ } else {
+ Set<String> cfSet = new HashSet<String>(cfs);
+ cfSet.addAll(appendCfs);
+ preTableCfs.put(table, Lists.newArrayList(cfSet));
+ }
+ } else {
+ if (appendCfs == null || appendCfs.isEmpty()) {
+ preTableCfs.put(table, null);
+ } else {
+ preTableCfs.put(table, Lists.newArrayList(appendCfs));
+ }
+ }
+ }
+ }
+ }
+
+ public static void removeTableCFsFromReplicationPeerConfig(
+ Map<TableName, ? extends Collection<String>> tableCfs, ReplicationPeerConfig peerConfig,
+ String id) throws ReplicationException {
+ Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
+ if (preTableCfs == null) {
+ throw new ReplicationException("Table-Cfs for peer: " + id + " is null");
+ }
+ for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
+ TableName table = entry.getKey();
+ Collection<String> removeCfs = entry.getValue();
+ if (preTableCfs.containsKey(table)) {
+ List<String> cfs = preTableCfs.get(table);
+ if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
+ preTableCfs.remove(table);
+ } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
+ Set<String> cfSet = new HashSet<String>(cfs);
+ cfSet.removeAll(removeCfs);
+ if (cfSet.isEmpty()) {
+ preTableCfs.remove(table);
+ } else {
+ preTableCfs.put(table, Lists.newArrayList(cfSet));
+ }
+ } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
+ throw new ReplicationException("Cannot remove cf of table: " + table
+ + " which doesn't specify cfs from table-cfs config in peer: " + id);
+ } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
+ throw new ReplicationException("Cannot remove table: " + table
+ + " which has specified cfs from table-cfs config in peer: " + id);
+ }
+ } else {
+ throw new ReplicationException("No table: "
+ + table + " in table-cfs config of peer: " + id);
+ }
+ }
+ }
+
+ /**
+ * Returns the configuration needed to talk to the remote slave cluster.
+ * @param conf the base configuration
+ * @param peer the description of replication peer
+ * @return the configuration for the peer cluster, null if it was unable to get the configuration
+ * @throws IOException when create peer cluster configuration failed
+ */
+ public static Configuration getPeerClusterConfiguration(Configuration conf,
+ ReplicationPeerDescription peer) throws IOException {
+ ReplicationPeerConfig peerConfig = peer.getPeerConfig();
+ Configuration otherConf;
+ try {
+ otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey());
+ } catch (IOException e) {
+ throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e);
+ }
+
+ if (!peerConfig.getConfiguration().isEmpty()) {
+ CompoundConfiguration compound = new CompoundConfiguration();
+ compound.add(otherConf);
+ compound.addStringMap(peerConfig.getConfiguration());
+ return compound;
+ }
+
+ return otherConf;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
deleted file mode 100644
index 986a09f..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client.replication;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Strings;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.ArrayList;
-import java.util.Set;
-
-/**
- * Helper for TableCFs Operations.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Stable
-public final class ReplicationSerDeHelper {
-
- private static final Log LOG = LogFactory.getLog(ReplicationSerDeHelper.class);
-
- private ReplicationSerDeHelper() {}
-
- public static String convertToString(Set<String> namespaces) {
- if (namespaces == null) {
- return null;
- }
- return StringUtils.join(namespaces, ';');
- }
-
- /** convert map to TableCFs Object */
- public static ReplicationProtos.TableCF[] convert(
- Map<TableName, ? extends Collection<String>> tableCfs) {
- if (tableCfs == null) {
- return null;
- }
- List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size());
- ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
- for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
- tableCFBuilder.clear();
- tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey()));
- Collection<String> v = entry.getValue();
- if (v != null && !v.isEmpty()) {
- for (String value : entry.getValue()) {
- tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value));
- }
- }
- tableCFList.add(tableCFBuilder.build());
- }
- return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
- }
-
- public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) {
- if (tableCfs == null) {
- return null;
- }
- return convert(convert(tableCfs));
- }
-
- /**
- * Convert string to TableCFs Object.
- * This is only for read TableCFs information from TableCF node.
- * Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3.
- * */
- public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) {
- if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
- return null;
- }
-
- ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
- String[] tables = tableCFsConfig.split(";");
- List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.length);
-
- for (String tab : tables) {
- // 1 ignore empty table config
- tab = tab.trim();
- if (tab.length() == 0) {
- continue;
- }
- // 2 split to "table" and "cf1,cf2"
- // for each table: "table#cf1,cf2" or "table"
- String[] pair = tab.split(":");
- String tabName = pair[0].trim();
- if (pair.length > 2 || tabName.length() == 0) {
- LOG.info("incorrect format:" + tableCFsConfig);
- continue;
- }
-
- tableCFBuilder.clear();
- // split namespace from tableName
- String ns = "default";
- String tName = tabName;
- String[] dbs = tabName.split("\\.");
- if (dbs != null && dbs.length == 2) {
- ns = dbs[0];
- tName = dbs[1];
- }
- tableCFBuilder.setTableName(
- ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName)));
-
- // 3 parse "cf1,cf2" part to List<cf>
- if (pair.length == 2) {
- String[] cfsList = pair[1].split(",");
- for (String cf : cfsList) {
- String cfName = cf.trim();
- if (cfName.length() > 0) {
- tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName));
- }
- }
- }
- tableCFList.add(tableCFBuilder.build());
- }
- return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
- }
-
- /**
- * Convert TableCFs Object to String.
- * Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3
- * */
- public static String convert(ReplicationProtos.TableCF[] tableCFs) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0, n = tableCFs.length; i < n; i++) {
- ReplicationProtos.TableCF tableCF = tableCFs[i];
- String namespace = tableCF.getTableName().getNamespace().toStringUtf8();
- if (!Strings.isEmpty(namespace)) {
- sb.append(namespace).append(".").
- append(tableCF.getTableName().getQualifier().toStringUtf8())
- .append(":");
- } else {
- sb.append(tableCF.getTableName().toString()).append(":");
- }
- for (int j = 0; j < tableCF.getFamiliesCount(); j++) {
- sb.append(tableCF.getFamilies(j).toStringUtf8()).append(",");
- }
- sb.deleteCharAt(sb.length() - 1).append(";");
- }
- if (sb.length() > 0) {
- sb.deleteCharAt(sb.length() - 1);
- }
- return sb.toString();
- }
-
- /**
- * Get TableCF in TableCFs, if not exist, return null.
- * */
- public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs,
- String table) {
- for (int i = 0, n = tableCFs.length; i < n; i++) {
- ReplicationProtos.TableCF tableCF = tableCFs[i];
- if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) {
- return tableCF;
- }
- }
- return null;
- }
-
- /**
- * Parse bytes into TableCFs.
- * It is used for backward compatibility.
- * Old format bytes have no PB_MAGIC Header
- * */
- public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException {
- if (bytes == null) {
- return null;
- }
- return ReplicationSerDeHelper.convert(Bytes.toString(bytes));
- }
-
- /**
- * Convert tableCFs string into Map.
- * */
- public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
- ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig);
- return convert2Map(tableCFs);
- }
-
- /**
- * Convert tableCFs Object to Map.
- * */
- public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) {
- if (tableCFs == null || tableCFs.length == 0) {
- return null;
- }
- Map<TableName, List<String>> tableCFsMap = new HashMap<>();
- for (int i = 0, n = tableCFs.length; i < n; i++) {
- ReplicationProtos.TableCF tableCF = tableCFs[i];
- List<String> families = new ArrayList<>();
- for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) {
- families.add(tableCF.getFamilies(j).toStringUtf8());
- }
- if (families.size() > 0) {
- tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families);
- } else {
- tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null);
- }
- }
-
- return tableCFsMap;
- }
-
- /**
- * @param bytes Content of a peer znode.
- * @return ClusterKey parsed from the passed bytes.
- * @throws DeserializationException
- */
- public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
- throws DeserializationException {
- if (ProtobufUtil.isPBMagicPrefix(bytes)) {
- int pblen = ProtobufUtil.lengthOfPBMagic();
- ReplicationProtos.ReplicationPeer.Builder builder =
- ReplicationProtos.ReplicationPeer.newBuilder();
- ReplicationProtos.ReplicationPeer peer;
- try {
- ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
- peer = builder.build();
- } catch (IOException e) {
- throw new DeserializationException(e);
- }
- return convert(peer);
- } else {
- if (bytes.length > 0) {
- return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
- }
- return new ReplicationPeerConfig().setClusterKey("");
- }
- }
-
- public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) {
- ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
- if (peer.hasClusterkey()) {
- peerConfig.setClusterKey(peer.getClusterkey());
- }
- if (peer.hasReplicationEndpointImpl()) {
- peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
- }
-
- for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) {
- peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
- }
-
- for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
- peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
- }
-
- Map<TableName, ? extends Collection<String>> tableCFsMap = convert2Map(
- peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()]));
- if (tableCFsMap != null) {
- peerConfig.setTableCFsMap(tableCFsMap);
- }
- List<ByteString> namespacesList = peer.getNamespacesList();
- if (namespacesList != null && namespacesList.size() != 0) {
- Set<String> namespaces = new HashSet<>();
- for (ByteString namespace : namespacesList) {
- namespaces.add(namespace.toStringUtf8());
- }
- peerConfig.setNamespaces(namespaces);
- }
- if (peer.hasBandwidth()) {
- peerConfig.setBandwidth(peer.getBandwidth());
- }
- return peerConfig;
- }
-
- public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
- ReplicationProtos.ReplicationPeer.Builder builder = ReplicationProtos.ReplicationPeer.newBuilder();
- if (peerConfig.getClusterKey() != null) {
- builder.setClusterkey(peerConfig.getClusterKey());
- }
- if (peerConfig.getReplicationEndpointImpl() != null) {
- builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
- }
-
- for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
- builder.addData(HBaseProtos.BytesBytesPair.newBuilder()
- .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey()))
- .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue()))
- .build());
- }
-
- for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
- builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder()
- .setName(entry.getKey())
- .setValue(entry.getValue())
- .build());
- }
-
- ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
- if (tableCFs != null) {
- for (int i = 0; i < tableCFs.length; i++) {
- builder.addTableCfs(tableCFs[i]);
- }
- }
- Set<String> namespaces = peerConfig.getNamespaces();
- if (namespaces != null) {
- for (String namespace : namespaces) {
- builder.addNamespaces(ByteString.copyFromUtf8(namespace));
- }
- }
-
- builder.setBandwidth(peerConfig.getBandwidth());
- return builder.build();
- }
-
- /**
- * @param peerConfig
- * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
- * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
- * /hbase/replication/peers/PEER_ID
- */
- public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
- byte[] bytes = convert(peerConfig).toByteArray();
- return ProtobufUtil.prependPBMagic(bytes);
- }
-
- public static ReplicationPeerDescription toReplicationPeerDescription(
- ReplicationProtos.ReplicationPeerDescription desc) {
- boolean enabled = ReplicationProtos.ReplicationState.State.ENABLED == desc.getState()
- .getState();
- ReplicationPeerConfig config = convert(desc.getConfig());
- return new ReplicationPeerDescription(desc.getId(), enabled, config);
- }
-
- public static ReplicationProtos.ReplicationPeerDescription toProtoReplicationPeerDescription(
- ReplicationPeerDescription desc) {
- ReplicationProtos.ReplicationPeerDescription.Builder builder = ReplicationProtos.ReplicationPeerDescription
- .newBuilder();
- builder.setId(desc.getPeerId());
- ReplicationProtos.ReplicationState.Builder stateBuilder = ReplicationProtos.ReplicationState
- .newBuilder();
- stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED
- : ReplicationProtos.ReplicationState.State.DISABLED);
- builder.setState(stateBuilder.build());
- builder.setConfig(convert(desc.getPeerConfig()));
- return builder.build();
- }
-
- public static void appendTableCFsToReplicationPeerConfig(
- Map<TableName, ? extends Collection<String>> tableCfs, ReplicationPeerConfig peerConfig) {
- Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
- if (preTableCfs == null) {
- peerConfig.setTableCFsMap(tableCfs);
- } else {
- for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
- TableName table = entry.getKey();
- Collection<String> appendCfs = entry.getValue();
- if (preTableCfs.containsKey(table)) {
- List<String> cfs = preTableCfs.get(table);
- if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
- preTableCfs.put(table, null);
- } else {
- Set<String> cfSet = new HashSet<String>(cfs);
- cfSet.addAll(appendCfs);
- preTableCfs.put(table, Lists.newArrayList(cfSet));
- }
- } else {
- if (appendCfs == null || appendCfs.isEmpty()) {
- preTableCfs.put(table, null);
- } else {
- preTableCfs.put(table, Lists.newArrayList(appendCfs));
- }
- }
- }
- }
- }
-
- public static void removeTableCFsFromReplicationPeerConfig(
- Map<TableName, ? extends Collection<String>> tableCfs, ReplicationPeerConfig peerConfig,
- String id) throws ReplicationException {
- Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
- if (preTableCfs == null) {
- throw new ReplicationException("Table-Cfs for peer: " + id + " is null");
- }
- for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
- TableName table = entry.getKey();
- Collection<String> removeCfs = entry.getValue();
- if (preTableCfs.containsKey(table)) {
- List<String> cfs = preTableCfs.get(table);
- if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
- preTableCfs.remove(table);
- } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
- Set<String> cfSet = new HashSet<String>(cfs);
- cfSet.removeAll(removeCfs);
- if (cfSet.isEmpty()) {
- preTableCfs.remove(table);
- } else {
- preTableCfs.put(table, Lists.newArrayList(cfSet));
- }
- } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
- throw new ReplicationException("Cannot remove cf of table: " + table
- + " which doesn't specify cfs from table-cfs config in peer: " + id);
- } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
- throw new ReplicationException("Cannot remove table: " + table
- + " which has specified cfs from table-cfs config in peer: " + id);
- }
- } else {
- throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index bdd6e74..4d429c9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -123,4 +123,24 @@ public class ReplicationPeerConfig {
builder.append("bandwidth=").append(bandwidth);
return builder.toString();
}
+
+ /**
+ * Decide whether the table need replicate to the peer cluster
+ * @param table name of the table
+ * @return true if the table need replicate to the peer cluster
+ */
+ public boolean needToReplicate(TableName table) {
+ // If null means user has explicitly not configured any namespaces and table CFs
+ // so all the tables data are applicable for replication
+ if (namespaces == null && tableCFsMap == null) {
+ return true;
+ }
+ if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) {
+ return true;
+ }
+ if (tableCFsMap != null && tableCFsMap.containsKey(table)) {
+ return true;
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/600fdee8/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 4558deb..9eff114 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -1621,7 +1621,7 @@ public final class RequestConverter {
String peerId, ReplicationPeerConfig peerConfig) {
AddReplicationPeerRequest.Builder builder = AddReplicationPeerRequest.newBuilder();
builder.setPeerId(peerId);
- builder.setPeerConfig(ReplicationSerDeHelper.convert(peerConfig));
+ builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
return builder.build();
}
@@ -1658,7 +1658,7 @@ public final class RequestConverter {
UpdateReplicationPeerConfigRequest.Builder builder = UpdateReplicationPeerConfigRequest
.newBuilder();
builder.setPeerId(peerId);
- builder.setPeerConfig(ReplicationSerDeHelper.convert(peerConfig));
+ builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
return builder.build();
}