You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2015/12/15 18:43:19 UTC
[05/26] hbase git commit: HBASE-13153 Bulk Loaded HFile Replication
(Ashish Singhi)
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a8cffba..9ff4b2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -45,8 +45,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
@@ -225,8 +227,16 @@ public class ReplicationSourceManager implements ReplicationListener {
* old region server wal queues
*/
protected void init() throws IOException, ReplicationException {
+ boolean replicationForBulkLoadDataEnabled =
+ conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
for (String id : this.replicationPeers.getPeerIds()) {
addSource(id);
+ if (replicationForBulkLoadDataEnabled) {
+ // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
+ // when a peer was added before replication for bulk loaded data was enabled.
+ this.replicationQueues.addPeerToHFileRefs(id);
+ }
}
List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
if (currentReplicators == null || currentReplicators.size() == 0) {
@@ -733,4 +743,15 @@ public class ReplicationSourceManager implements ReplicationListener {
}
return stats.toString();
}
+
+ public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+ throws ReplicationException {
+ for (ReplicationSourceInterface source : this.sources) {
+ source.addHFileRefs(tableName, family, files);
+ }
+ }
+
+ public void cleanUpHFileRefs(String peerId, List<String> files) {
+ this.replicationQueues.removeHFileRefs(peerId, files);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java
new file mode 100644
index 0000000..8271115
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Interface that defines how a region server in peer cluster will get source cluster file system
+ * configurations. User can configure their custom implementation implementing this interface by
+ * setting the value of their custom implementation's fully qualified class name to
+ * hbase.replication.source.fs.conf.provider property in RegionServer configuration. Default is
+ * {@link DefaultSourceFSConfigurationProvider}
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface SourceFSConfigurationProvider {
+
+ /**
+ * Returns the source cluster file system configuration for the given source cluster replication
+ * ID.
+ * @param sinkConf sink cluster configuration
+ * @param replicationClusterId unique ID which identifies the source cluster
+ * @return source cluster file system configuration
+ * @throws IOException for invalid directory or for a bad disk.
+ */
+ public Configuration getConf(Configuration sinkConf, String replicationClusterId)
+ throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
index becc9f3..3541ade 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
@@ -217,7 +217,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
familyPaths.add(new Pair(el.getFamily().toByteArray(),el.getPath()));
}
-
+
Token userToken = null;
if (userProvider.isHadoopSecurityEnabled()) {
userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
@@ -375,6 +375,14 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
Path p = new Path(srcPath);
Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
+
+ // In case of Replication for bulk load files, hfiles are already copied in staging directory
+ if (p.equals(stageP)) {
+ LOG.debug(p.getName()
+ + " is already available in staging directory. Skipping copy or rename.");
+ return stageP.toString();
+ }
+
if (srcFs == null) {
srcFs = FileSystem.get(p.toUri(), conf);
}
@@ -414,6 +422,14 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
Path p = new Path(srcPath);
Path stageP = new Path(stagingDir,
new Path(Bytes.toString(family), p.getName()));
+
+ // In case of Replication for bulk load files, hfiles are not renamed by end point during
+ // prepare stage, so no need of rename here again
+ if (p.equals(stageP)) {
+ LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");
+ return;
+ }
+
LOG.debug("Moving " + stageP + " back to " + p);
if(!fs.rename(stageP, p))
throw new IOException("Failed to move HFile: " + stageP + " to " + p);
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
new file mode 100644
index 0000000..87db386
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
+import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestReplicationHFileCleaner {
+ private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static Server server;
+ private static ReplicationQueues rq;
+ private static ReplicationPeers rp;
+ private static final String peerId = "TestReplicationHFileCleaner";
+ private static Configuration conf = TEST_UTIL.getConfiguration();
+ static FileSystem fs = null;
+ Path root;
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniZKCluster();
+ server = new DummyServer();
+ conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+ Replication.decorateMasterConfiguration(conf);
+ rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server);
+ rp.init();
+
+ rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
+ rq.init(server.getServerName().toString());
+ try {
+ fs = FileSystem.get(conf);
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ }
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniZKCluster();
+ }
+
+ @Before
+ public void setup() throws ReplicationException, IOException {
+ root = TEST_UTIL.getDataTestDirOnTestFS();
+ rp.addPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()), null);
+ }
+
+ @After
+ public void cleanup() throws ReplicationException {
+ try {
+ fs.delete(root, true);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete files recursively from path " + root);
+ }
+ rp.removePeer(peerId);
+ }
+
+ @Test
+ public void testIsFileDeletable() throws IOException, ReplicationException {
+ // 1. Create a file
+ Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs");
+ fs.createNewFile(file);
+ // 2. Assert file is successfully created
+ assertTrue("Test file not created!", fs.exists(file));
+ ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
+ cleaner.setConf(conf);
+ // 3. Assert that file as is should be deletable
+ assertTrue("Cleaner should allow to delete this file as there is no hfile reference node "
+ + "for it in the queue.",
+ cleaner.isFileDeletable(fs.getFileStatus(file)));
+
+ List<String> files = new ArrayList<String>(1);
+ files.add(file.getName());
+ // 4. Add the file to hfile-refs queue
+ rq.addHFileRefs(peerId, files);
+ // 5. Assert file should not be deletable
+ assertFalse("Cleaner should not allow to delete this file as there is a hfile reference node "
+ + "for it in the queue.",
+ cleaner.isFileDeletable(fs.getFileStatus(file)));
+ }
+
+ @Test
+ public void testGetDeletableFiles() throws Exception {
+ // 1. Create two files and assert that they do not exist
+ Path notDeletablefile = new Path(root, "testGetDeletableFiles_1");
+ fs.createNewFile(notDeletablefile);
+ assertTrue("Test file not created!", fs.exists(notDeletablefile));
+ Path deletablefile = new Path(root, "testGetDeletableFiles_2");
+ fs.createNewFile(deletablefile);
+ assertTrue("Test file not created!", fs.exists(deletablefile));
+
+ List<FileStatus> files = new ArrayList<FileStatus>(2);
+ FileStatus f = new FileStatus();
+ f.setPath(deletablefile);
+ files.add(f);
+ f = new FileStatus();
+ f.setPath(notDeletablefile);
+ files.add(f);
+
+ List<String> hfiles = new ArrayList<>(1);
+ hfiles.add(notDeletablefile.getName());
+ // 2. Add one file to hfile-refs queue
+ rq.addHFileRefs(peerId, hfiles);
+
+ ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
+ cleaner.setConf(conf);
+ Iterator<FileStatus> deletableFilesIterator = cleaner.getDeletableFiles(files).iterator();
+ int i = 0;
+ while (deletableFilesIterator.hasNext() && i < 2) {
+ i++;
+ }
+ // 5. Assert one file should not be deletable and it is present in the list returned
+ if (i > 2) {
+ fail("File " + notDeletablefile
+ + " should not be deletable as its hfile reference node is not added.");
+ }
+ assertTrue(deletableFilesIterator.next().getPath().equals(deletablefile));
+ }
+
+ /*
+ * Test for HBASE-14621. This test will not assert directly anything. Without the fix the test
+ * will end up in a infinite loop, so it will timeout.
+ */
+ @Test(timeout = 15000)
+ public void testForDifferntHFileRefsZnodeVersion() throws Exception {
+ // 1. Create a file
+ Path file = new Path(root, "testForDifferntHFileRefsZnodeVersion");
+ fs.createNewFile(file);
+ // 2. Assert file is successfully created
+ assertTrue("Test file not created!", fs.exists(file));
+ ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
+ cleaner.setConf(conf);
+
+ ReplicationQueuesClient replicationQueuesClient = Mockito.mock(ReplicationQueuesClient.class);
+ //Return different znode version for each call
+ Mockito.when(replicationQueuesClient.getHFileRefsNodeChangeVersion()).thenReturn(1, 2);
+
+ Class<? extends ReplicationHFileCleaner> cleanerClass = cleaner.getClass();
+ Field rqc = cleanerClass.getDeclaredField("rqc");
+ rqc.setAccessible(true);
+ rqc.set(cleaner, replicationQueuesClient);
+
+ cleaner.isFileDeletable(fs.getFileStatus(file));
+ }
+
+ static class DummyServer implements Server {
+
+ @Override
+ public Configuration getConfiguration() {
+ return TEST_UTIL.getConfiguration();
+ }
+
+ @Override
+ public ZooKeeperWatcher getZooKeeper() {
+ try {
+ return new ZooKeeperWatcher(getConfiguration(), "dummy server", this);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @Override
+ public CoordinatedStateManager getCoordinatedStateManager() {
+ return null;
+ }
+
+ @Override
+ public ClusterConnection getConnection() {
+ return null;
+ }
+
+ @Override
+ public MetaTableLocator getMetaTableLocator() {
+ return null;
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return ServerName.valueOf("regionserver,60020,000000");
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ }
+
+ @Override
+ public boolean isAborted() {
+ return false;
+ }
+
+ @Override
+ public void stop(String why) {
+ }
+
+ @Override
+ public boolean isStopped() {
+ return false;
+ }
+
+ @Override
+ public ChoreService getChoreService() {
+ return null;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index f463f76..abe484e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -19,12 +19,14 @@
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
+import java.util.List;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
@@ -89,4 +91,10 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
public String getStats() {
return "";
}
+
+ @Override
+ public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+ throws ReplicationException {
+ return;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 455a790..e919c24 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -19,15 +19,21 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -35,7 +41,9 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -48,12 +56,17 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
@@ -79,6 +92,7 @@ public class TestMasterReplication {
private static final TableName tableName = TableName.valueOf("test");
private static final byte[] famName = Bytes.toBytes("f");
+ private static final byte[] famName1 = Bytes.toBytes("f1");
private static final byte[] row = Bytes.toBytes("row");
private static final byte[] row1 = Bytes.toBytes("row1");
private static final byte[] row2 = Bytes.toBytes("row2");
@@ -103,7 +117,11 @@ public class TestMasterReplication {
baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
- HConstants.REPLICATION_ENABLE_DEFAULT);
+ HConstants.REPLICATION_ENABLE_DEFAULT);
+ baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+ baseConfiguration.set("hbase.replication.source.fs.conf.provider",
+ TestSourceFSConfigurationProvider.class.getCanonicalName());
+ baseConfiguration.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
baseConfiguration.setBoolean("dfs.support.append", true);
baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
baseConfiguration.setStrings(
@@ -114,6 +132,9 @@ public class TestMasterReplication {
HColumnDescriptor fam = new HColumnDescriptor(famName);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
table.addFamily(fam);
+ fam = new HColumnDescriptor(famName1);
+ fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+ table.addFamily(fam);
fam = new HColumnDescriptor(noRepfamName);
table.addFamily(fam);
}
@@ -130,14 +151,7 @@ public class TestMasterReplication {
int numClusters = 2;
Table[] htables = null;
try {
- startMiniClusters(numClusters);
- createTableOnClusters(table);
-
- htables = getHTablesOnClusters(tableName);
-
- // Test the replication scenarios of 0 -> 1 -> 0
- addPeer("1", 0, 1);
- addPeer("1", 1, 0);
+ htables = setUpClusterTablesAndPeers(numClusters);
int[] expectedCounts = new int[] { 2, 2 };
@@ -157,12 +171,64 @@ public class TestMasterReplication {
}
/**
- * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and
- * deleting rows to a table in each clusters and ensuring that the each of
- * these clusters get the appropriate mutations. It also tests the grouping
- * scenario where a cluster needs to replicate the edits originating from
- * itself and also the edits that it received using replication from a
- * different cluster. The scenario is explained in HBASE-9158
+ * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of
+ * HFiles to a table in each cluster, checking if it's replicated.
+ */
+ @Test(timeout = 300000)
+ public void testHFileCyclicReplication() throws Exception {
+ LOG.info("testHFileCyclicReplication");
+ int numClusters = 2;
+ Table[] htables = null;
+ try {
+ htables = setUpClusterTablesAndPeers(numClusters);
+
+ // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
+ // to cluster '1'.
+ byte[][][] hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+ new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
+ int numOfRows = 100;
+ int[] expectedCounts =
+ new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
+
+ loadAndValidateHFileReplication("testHFileCyclicReplication_01", 0, new int[] { 1 }, row,
+ famName, htables, hfileRanges, numOfRows, expectedCounts, true);
+
+ // Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated
+ // to cluster '0'.
+ hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
+ new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
+ numOfRows = 200;
+ int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
+ hfileRanges.length * numOfRows + expectedCounts[1] };
+
+ loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row,
+ famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
+
+ } finally {
+ close(htables);
+ shutDownMiniClusters();
+ }
+ }
+
+ private Table[] setUpClusterTablesAndPeers(int numClusters) throws Exception {
+ Table[] htables;
+ startMiniClusters(numClusters);
+ createTableOnClusters(table);
+
+ htables = getHTablesOnClusters(tableName);
+ // Test the replication scenarios of 0 -> 1 -> 0
+ addPeer("1", 0, 1);
+ addPeer("1", 1, 0);
+ return htables;
+ }
+
+ /**
+ * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and deleting rows to a
+ * table in each clusters and ensuring that the each of these clusters get the appropriate
+ * mutations. It also tests the grouping scenario where a cluster needs to replicate the edits
+ * originating from itself and also the edits that it received using replication from a different
+ * cluster. The scenario is explained in HBASE-9158
*/
@Test(timeout = 300000)
public void testCyclicReplication2() throws Exception {
@@ -213,6 +279,119 @@ public class TestMasterReplication {
}
/**
+ * It tests the multi slave hfile replication scenario involving 0 -> 1, 2. It does it by bulk
+ * loading a set of HFiles to a table in master cluster, checking if it's replicated in its peers.
+ */
+ @Test(timeout = 300000)
+ public void testHFileMultiSlaveReplication() throws Exception {
+ LOG.info("testHFileMultiSlaveReplication");
+ int numClusters = 3;
+ Table[] htables = null;
+ try {
+ startMiniClusters(numClusters);
+ createTableOnClusters(table);
+
+ // Add a slave, 0 -> 1
+ addPeer("1", 0, 1);
+
+ htables = getHTablesOnClusters(tableName);
+
+ // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
+ // to cluster '1'.
+ byte[][][] hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") },
+ new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, };
+ int numOfRows = 100;
+
+ int[] expectedCounts =
+ new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
+
+ loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row,
+ famName, htables, hfileRanges, numOfRows, expectedCounts, true);
+
+ // Validate data is not replicated to cluster '2'.
+ assertEquals(0, utilities[2].countRows(htables[2]));
+
+ rollWALAndWait(utilities[0], htables[0].getName(), row);
+
+ // Add one more slave, 0 -> 2
+ addPeer("2", 0, 2);
+
+ // Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated
+ // to cluster '1' and '2'. Previous data should be replicated to cluster '2'.
+ hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") },
+ new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, };
+ numOfRows = 200;
+
+ int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
+ hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows };
+
+ loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row,
+ famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
+
+ } finally {
+ close(htables);
+ shutDownMiniClusters();
+ }
+ }
+
+ /**
+ * It tests the bulk loaded hfile replication scenario to only explicitly specified table column
+ * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set
+ * only one CF data to replicate.
+ */
+ @Test(timeout = 300000)
+ public void testHFileReplicationForConfiguredTableCfs() throws Exception {
+ LOG.info("testHFileReplicationForConfiguredTableCfs");
+ int numClusters = 2;
+ Table[] htables = null;
+ try {
+ startMiniClusters(numClusters);
+ createTableOnClusters(table);
+
+ htables = getHTablesOnClusters(tableName);
+ // Test the replication scenarios only 'f' is configured for table data replication not 'f1'
+ addPeer("1", 0, 1, tableName.getNameAsString() + ":" + Bytes.toString(famName));
+
+ // Load 100 rows for each hfile range in cluster '0' for table CF 'f'
+ byte[][][] hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+ new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
+ int numOfRows = 100;
+ int[] expectedCounts =
+ new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
+
+ loadAndValidateHFileReplication("load_f", 0, new int[] { 1 }, row, famName, htables,
+ hfileRanges, numOfRows, expectedCounts, true);
+
+ // Load 100 rows for each hfile range in cluster '0' for table CF 'f1'
+ hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
+ new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
+ numOfRows = 100;
+
+ int[] newExpectedCounts =
+ new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] };
+
+ loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables,
+ hfileRanges, numOfRows, newExpectedCounts, false);
+
+ // Validate data replication for CF 'f1'
+
+ // Source cluster table should contain data for the families
+ wait(0, htables[0], hfileRanges.length * numOfRows + expectedCounts[0]);
+
+ // Sleep for enough time so that the data is still not replicated for the CF which is not
+ // configured for replication
+ Thread.sleep((NB_RETRIES / 2) * SLEEP_TIME);
+ // Peer cluster should have only configured CF data
+ wait(1, htables[1], expectedCounts[1]);
+ } finally {
+ close(htables);
+ shutDownMiniClusters();
+ }
+ }
+
+ /**
* Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1.
*/
@Test(timeout = 300000)
@@ -328,6 +507,17 @@ public class TestMasterReplication {
close(replicationAdmin);
}
}
+
+ private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
+ throws Exception {
+ ReplicationAdmin replicationAdmin = null;
+ try {
+ replicationAdmin = new ReplicationAdmin(configurations[masterClusterNumber]);
+ replicationAdmin.addPeer(id, utilities[slaveClusterNumber].getClusterKey(), tableCfs);
+ } finally {
+ close(replicationAdmin);
+ }
+ }
private void disablePeer(String id, int masterClusterNumber) throws Exception {
ReplicationAdmin replicationAdmin = null;
@@ -405,8 +595,56 @@ public class TestMasterReplication {
wait(row, target, false);
}
- private void wait(byte[] row, Table target, boolean isDeleted)
- throws Exception {
+ private void loadAndValidateHFileReplication(String testName, int masterNumber,
+ int[] slaveNumbers, byte[] row, byte[] fam, Table[] tables, byte[][][] hfileRanges,
+ int numOfRows, int[] expectedCounts, boolean toValidate) throws Exception {
+ HBaseTestingUtility util = utilities[masterNumber];
+
+ Path dir = util.getDataTestDirOnTestFS(testName);
+ FileSystem fs = util.getTestFileSystem();
+ dir = dir.makeQualified(fs);
+ Path familyDir = new Path(dir, Bytes.toString(fam));
+
+ int hfileIdx = 0;
+ for (byte[][] range : hfileRanges) {
+ byte[] from = range[0];
+ byte[] to = range[1];
+ HFileTestUtil.createHFile(util.getConfiguration(), fs,
+ new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
+ }
+
+ Table source = tables[masterNumber];
+ final TableName tableName = source.getName();
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
+ String[] args = { dir.toString(), tableName.toString() };
+ loader.run(args);
+
+ if (toValidate) {
+ for (int slaveClusterNumber : slaveNumbers) {
+ wait(slaveClusterNumber, tables[slaveClusterNumber], expectedCounts[slaveClusterNumber]);
+ }
+ }
+ }
+
+ private void wait(int slaveNumber, Table target, int expectedCount)
+ throws IOException, InterruptedException {
+ int count = 0;
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i == NB_RETRIES - 1) {
+ fail("Waited too much time for bulkloaded data replication. Current count=" + count
+ + ", expected count=" + expectedCount);
+ }
+ count = utilities[slaveNumber].countRows(target);
+ if (count != expectedCount) {
+ LOG.info("Waiting more time for bulkloaded data replication.");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+ }
+
+ private void wait(byte[] row, Table target, boolean isDeleted) throws Exception {
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES - 1) {
@@ -430,6 +668,47 @@ public class TestMasterReplication {
}
}
+ private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
+ final byte[] row) throws IOException {
+ final Admin admin = utility.getHBaseAdmin();
+ final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
+
+ // find the region that corresponds to the given row.
+ HRegion region = null;
+ for (HRegion candidate : cluster.getRegions(table)) {
+ if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
+ region = candidate;
+ break;
+ }
+ }
+ assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ // listen for successful log rolls
+ final WALActionsListener listener = new WALActionsListener.Base() {
+ @Override
+ public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
+ latch.countDown();
+ }
+ };
+ region.getWAL().registerWALActionsListener(listener);
+
+ // request a roll
+ admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(),
+ region.getRegionInfo().getRegionName()));
+
+ // wait
+ try {
+ latch.await();
+ } catch (InterruptedException exception) {
+ LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
+ "replication tests fail, it's probably because we should still be waiting.");
+ Thread.currentThread().interrupt();
+ }
+ region.getWAL().unregisterWALActionsListener(listener);
+ }
+
/**
* Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
* timestamp there is otherwise no way to count them.
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index 4823597..47d2880 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -658,7 +658,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
HRegionInfo hri = new HRegionInfo(htable1.getName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
- Replication.scopeWALEdits(htable1.getTableDescriptor(), new WALKey(), edit);
+ Replication.scopeWALEdits(htable1.getTableDescriptor(), new WALKey(), edit,
+ htable1.getConfiguration(), null);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 696c130..41c3240 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.*;
+import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.SortedSet;
@@ -160,6 +161,62 @@ public abstract class TestReplicationStateBasic {
}
@Test
+ public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
+ rp.init();
+ rq1.init(server1);
+ rqc.init();
+
+ List<String> files1 = new ArrayList<String>(3);
+ files1.add("file_1");
+ files1.add("file_2");
+ files1.add("file_3");
+ assertNull(rqc.getReplicableHFiles(ID_ONE));
+ assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
+ rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+ rq1.addHFileRefs(ID_ONE, files1);
+ assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
+ assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
+ List<String> files2 = new ArrayList<>(files1);
+ String removedString = files2.remove(0);
+ rq1.removeHFileRefs(ID_ONE, files2);
+ assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size());
+ files2 = new ArrayList<>(1);
+ files2.add(removedString);
+ rq1.removeHFileRefs(ID_ONE, files2);
+ assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size());
+ rp.removePeer(ID_ONE);
+ }
+
+ @Test
+ public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
+ rq1.init(server1);
+ rqc.init();
+
+ rp.init();
+ rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+ rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null);
+
+ List<String> files1 = new ArrayList<String>(3);
+ files1.add("file_1");
+ files1.add("file_2");
+ files1.add("file_3");
+ rq1.addHFileRefs(ID_ONE, files1);
+ rq1.addHFileRefs(ID_TWO, files1);
+ assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size());
+ assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
+ assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
+
+ rp.removePeer(ID_ONE);
+ assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
+ assertNull(rqc.getReplicableHFiles(ID_ONE));
+ assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
+
+ rp.removePeer(ID_TWO);
+ assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
+ assertNull(rqc.getReplicableHFiles(ID_TWO));
+ }
+
+ @Test
public void testReplicationPeers() throws Exception {
rp.init();
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index 4587c61..3b7402a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -64,6 +64,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
utility = new HBaseTestingUtility();
utility.startMiniZKCluster();
conf = utility.getConfiguration();
+ conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
index 13545b5..b36bb9a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
@@ -52,15 +52,15 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
private static final TableName t1_su = TableName.valueOf("t1_syncup");
private static final TableName t2_su = TableName.valueOf("t2_syncup");
- private static final byte[] famName = Bytes.toBytes("cf1");
+ protected static final byte[] famName = Bytes.toBytes("cf1");
private static final byte[] qualName = Bytes.toBytes("q1");
- private static final byte[] noRepfamName = Bytes.toBytes("norep");
+ protected static final byte[] noRepfamName = Bytes.toBytes("norep");
private HTableDescriptor t1_syncupSource, t1_syncupTarget;
private HTableDescriptor t2_syncupSource, t2_syncupTarget;
- private Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
+ protected Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
@Before
public void setUp() throws Exception {
@@ -179,7 +179,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
}
- private void setupReplication() throws Exception {
+ protected void setupReplication() throws Exception {
ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
@@ -418,7 +418,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
}
}
- private void syncUp(HBaseTestingUtility ut) throws Exception {
+ protected void syncUp(HBaseTestingUtility ut) throws Exception {
ReplicationSyncUp.setConfigure(ut.getConfiguration());
String[] arguments = new String[] { null };
new ReplicationSyncUp().run(arguments);
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
new file mode 100644
index 0000000..f54c632
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpTool {
+
+ private static final Log LOG = LogFactory
+ .getLog(TestReplicationSyncUpToolWithBulkLoadedData.class);
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+ conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
+ conf1.set("hbase.replication.source.fs.conf.provider",
+ TestSourceFSConfigurationProvider.class.getCanonicalName());
+ String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+ if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) {
+ classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint";
+ conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes);
+ }
+
+ TestReplicationBase.setUpBeforeClass();
+ }
+
+ @Override
+ public void testSyncUpTool() throws Exception {
+ /**
+ * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
+ * 'cf1' : replicated 'norep': not replicated
+ */
+ setupReplication();
+
+ /**
+ * Prepare 16 random hfile ranges required for creating hfiles
+ */
+ Iterator<String> randomHFileRangeListIterator = null;
+ Set<String> randomHFileRanges = new HashSet<String>(16);
+ for (int i = 0; i < 16; i++) {
+ randomHFileRanges.add(UUID.randomUUID().toString());
+ }
+ List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
+ Collections.sort(randomHFileRangeList);
+ randomHFileRangeListIterator = randomHFileRangeList.iterator();
+
+ /**
+ * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows
+ * into cf1, and 3 rows into norep verify correctly replicated to slave
+ */
+ loadAndReplicateHFiles(true, randomHFileRangeListIterator);
+
+ /**
+ * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load
+ * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and
+ * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
+ * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step
+ * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not
+ * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to
+ * Slave
+ */
+ mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
+
+ }
+
+ private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
+ throws Exception {
+ LOG.debug("mimicSyncUpAfterBulkLoad");
+ utility2.shutdownMiniHBaseCluster();
+
+ loadAndReplicateHFiles(false, randomHFileRangeListIterator);
+
+ int rowCount_ht1Source = utility1.countRows(ht1Source);
+ assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206,
+ rowCount_ht1Source);
+
+ int rowCount_ht2Source = utility1.countRows(ht2Source);
+ assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
+ rowCount_ht2Source);
+
+ utility1.shutdownMiniHBaseCluster();
+ utility2.restartHBaseCluster(1);
+
+ Thread.sleep(SLEEP_TIME);
+
+ // Before sync up
+ int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
+ int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+ assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
+ assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
+
+ // Run sync up tool
+ syncUp(utility1);
+
+ // After syun up
+ for (int i = 0; i < NB_RETRIES; i++) {
+ syncUp(utility1);
+ rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
+ rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+ if (i == NB_RETRIES - 1) {
+ if (rowCount_ht1TargetAtPeer1 != 200 || rowCount_ht2TargetAtPeer1 != 400) {
+ // syncUP still failed. Let's look at the source in case anything wrong there
+ utility1.restartHBaseCluster(1);
+ rowCount_ht1Source = utility1.countRows(ht1Source);
+ LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
+ rowCount_ht2Source = utility1.countRows(ht2Source);
+ LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source);
+ }
+ assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
+ rowCount_ht1TargetAtPeer1);
+ assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
+ rowCount_ht2TargetAtPeer1);
+ }
+ if (rowCount_ht1TargetAtPeer1 == 200 && rowCount_ht2TargetAtPeer1 == 400) {
+ LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i);
+ break;
+ } else {
+ LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
+ + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
+ + rowCount_ht2TargetAtPeer1);
+ }
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+
+ private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,
+ Iterator<String> randomHFileRangeListIterator) throws Exception {
+ LOG.debug("loadAndReplicateHFiles");
+
+ // Load 100 + 3 hfiles to t1_syncup.
+ byte[][][] hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges,
+ 100);
+
+ hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source,
+ hfileRanges, 3);
+
+ // Load 200 + 3 hfiles to t2_syncup.
+ hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges,
+ 200);
+
+ hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht2Source,
+ hfileRanges, 3);
+
+ if (verifyReplicationOnSlave) {
+ // ensure replication completed
+ wait(ht1TargetAtPeer1, utility1.countRows(ht1Source) - 3,
+ "t1_syncup has 103 rows on source, and 100 on slave1");
+
+ wait(ht2TargetAtPeer1, utility1.countRows(ht2Source) - 3,
+ "t2_syncup has 203 rows on source, and 200 on slave1");
+ }
+ }
+
+ private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
+ Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
+ Path dir = utility1.getDataTestDirOnTestFS(testName);
+ FileSystem fs = utility1.getTestFileSystem();
+ dir = dir.makeQualified(fs);
+ Path familyDir = new Path(dir, Bytes.toString(fam));
+
+ int hfileIdx = 0;
+ for (byte[][] range : hfileRanges) {
+ byte[] from = range[0];
+ byte[] to = range[1];
+ HFileTestUtil.createHFile(utility1.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ + hfileIdx++), fam, row, from, to, numOfRows);
+ }
+
+ final TableName tableName = source.getName();
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration());
+ String[] args = { dir.toString(), tableName.toString() };
+ loader.run(args);
+ }
+
+ private void wait(Table target, int expectedCount, String msg) throws IOException,
+ InterruptedException {
+ for (int i = 0; i < NB_RETRIES; i++) {
+ int rowCount_ht2TargetAtPeer1 = utility2.countRows(target);
+ if (i == NB_RETRIES - 1) {
+ assertEquals(msg, expectedCount, rowCount_ht2TargetAtPeer1);
+ }
+ if (expectedCount == rowCount_ht2TargetAtPeer1) {
+ break;
+ }
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
index b87e7ef..f08d2bf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
@@ -21,32 +21,52 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.security.SecureRandom;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -58,21 +78,18 @@ public class TestReplicationSink {
private static final Log LOG = LogFactory.getLog(TestReplicationSink.class);
private static final int BATCH_SIZE = 10;
- private final static HBaseTestingUtility TEST_UTIL =
- new HBaseTestingUtility();
+ protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static ReplicationSink SINK;
+ protected static ReplicationSink SINK;
- private static final TableName TABLE_NAME1 =
- TableName.valueOf("table1");
- private static final TableName TABLE_NAME2 =
- TableName.valueOf("table2");
+ protected static final TableName TABLE_NAME1 = TableName.valueOf("table1");
+ protected static final TableName TABLE_NAME2 = TableName.valueOf("table2");
- private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
- private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
+ protected static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
+ protected static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
- private static Table table1;
- private static Stoppable STOPPABLE = new Stoppable() {
+ protected static Table table1;
+ protected static Stoppable STOPPABLE = new Stoppable() {
final AtomicBoolean stop = new AtomicBoolean(false);
@Override
@@ -85,10 +102,13 @@ public class TestReplicationSink {
LOG.info("STOPPING BECAUSE: " + why);
this.stop.set(true);
}
-
+
};
- private static Table table2;
+ protected static Table table2;
+ protected static String baseNamespaceDir;
+ protected static String hfileArchiveDir;
+ protected static String replicationClusterId;
/**
* @throws java.lang.Exception
@@ -98,11 +118,18 @@ public class TestReplicationSink {
TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY,
HConstants.REPLICATION_ENABLE_DEFAULT);
+ TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
+ TestSourceFSConfigurationProvider.class.getCanonicalName());
+
TEST_UTIL.startMiniCluster(3);
SINK =
new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
+ Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
+ baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)).toString();
+ hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY)).toString();
+ replicationClusterId = "12345";
}
/**
@@ -134,7 +161,8 @@ public class TestReplicationSink {
for(int i = 0; i < BATCH_SIZE; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
- SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Scan scan = new Scan();
ResultScanner scanRes = table1.getScanner(scan);
assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
@@ -151,7 +179,8 @@ public class TestReplicationSink {
for(int i = 0; i < BATCH_SIZE/2; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
- SINK.replicateEntries(entries, CellUtil.createCellScanner(cells));
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
+ baseNamespaceDir, hfileArchiveDir);
entries = new ArrayList<WALEntry>(BATCH_SIZE);
cells = new ArrayList<Cell>();
@@ -160,7 +189,8 @@ public class TestReplicationSink {
i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn, cells));
}
- SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Scan scan = new Scan();
ResultScanner scanRes = table1.getScanner(scan);
assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
@@ -179,7 +209,8 @@ public class TestReplicationSink {
i, KeyValue.Type.Put, cells));
}
- SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Scan scan = new Scan();
ResultScanner scanRes = table2.getScanner(scan);
for(Result res : scanRes) {
@@ -198,14 +229,16 @@ public class TestReplicationSink {
for(int i = 0; i < 3; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
- SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
entries = new ArrayList<WALEntry>(3);
cells = new ArrayList<Cell>();
entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells));
entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells));
- SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Scan scan = new Scan();
ResultScanner scanRes = table1.getScanner(scan);
@@ -228,12 +261,96 @@ public class TestReplicationSink {
for(int i = 3; i < 5; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
- SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Get get = new Get(Bytes.toBytes(1));
Result res = table1.get(get);
assertEquals(0, res.size());
}
+ /**
+ * Test replicateEntries with a bulk load entry for 25 HFiles
+ */
+ @Test
+ public void testReplicateEntriesForHFiles() throws Exception {
+ Path dir = TEST_UTIL.getDataTestDirOnTestFS("testReplicateEntries");
+ Path familyDir = new Path(dir, Bytes.toString(FAM_NAME1));
+ int numRows = 10;
+
+ List<Path> p = new ArrayList<>(1);
+
+ // 1. Generate 25 hfile ranges
+ Random rng = new SecureRandom();
+ Set<Integer> numbers = new HashSet<>();
+ while (numbers.size() < 50) {
+ numbers.add(rng.nextInt(1000));
+ }
+ List<Integer> numberList = new ArrayList<>(numbers);
+ Collections.sort(numberList);
+
+ // 2. Create 25 hfiles
+ Configuration conf = TEST_UTIL.getConfiguration();
+ FileSystem fs = dir.getFileSystem(conf);
+ Iterator<Integer> numbersItr = numberList.iterator();
+ for (int i = 0; i < 25; i++) {
+ Path hfilePath = new Path(familyDir, "hfile_" + i);
+ HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1,
+ Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows);
+ p.add(hfilePath);
+ }
+
+ // 3. Create a BulkLoadDescriptor and a WALEdit
+ Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
+ storeFiles.put(FAM_NAME1, p);
+ WALEdit edit = null;
+ WALProtos.BulkLoadDescriptor loadDescriptor = null;
+
+ try (Connection c = ConnectionFactory.createConnection(conf);
+ RegionLocator l = c.getRegionLocator(TABLE_NAME1)) {
+ HRegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegionInfo();
+ loadDescriptor =
+ ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1,
+ ByteStringer.wrap(regionInfo.getEncodedNameAsBytes()), storeFiles, 1);
+ edit = WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor);
+ }
+ List<WALEntry> entries = new ArrayList<WALEntry>(1);
+
+ // 4. Create a WALEntryBuilder
+ WALEntry.Builder builder = createWALEntryBuilder(TABLE_NAME1);
+
+ // 5. Copy the hfile to the path as it is in reality
+ for (int i = 0; i < 25; i++) {
+ String pathToHfileFromNS =
+ new StringBuilder(100).append(TABLE_NAME1.getNamespaceAsString()).append(Path.SEPARATOR)
+ .append(Bytes.toString(TABLE_NAME1.getName())).append(Path.SEPARATOR)
+ .append(Bytes.toString(loadDescriptor.getEncodedRegionName().toByteArray()))
+ .append(Path.SEPARATOR).append(Bytes.toString(FAM_NAME1)).append(Path.SEPARATOR)
+ .append("hfile_" + i).toString();
+ String dst = baseNamespaceDir + Path.SEPARATOR + pathToHfileFromNS;
+
+ FileUtil.copy(fs, p.get(0), fs, new Path(dst), false, conf);
+ }
+
+ entries.add(builder.build());
+ ResultScanner scanRes = null;
+ try {
+ Scan scan = new Scan();
+ scanRes = table1.getScanner(scan);
+ // 6. Assert no existing data in table
+ assertEquals(0, scanRes.next(numRows).length);
+ // 7. Replicate the bulk loaded entry
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(edit.getCells().iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+ scanRes = table1.getScanner(scan);
+ // 8. Assert data is replicated
+ assertEquals(numRows, scanRes.next(numRows).length);
+ } finally {
+ if (scanRes != null) {
+ scanRes.close();
+ }
+ }
+ }
+
private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List<Cell> cells) {
byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
byte[] rowBytes = Bytes.toBytes(row);
@@ -256,6 +373,13 @@ public class TestReplicationSink {
kv = new KeyValue(rowBytes, fam, null,
now, KeyValue.Type.DeleteFamily);
}
+ WALEntry.Builder builder = createWALEntryBuilder(table);
+ cells.add(kv);
+
+ return builder.build();
+ }
+
+ private WALEntry.Builder createWALEntryBuilder(TableName table) {
WALEntry.Builder builder = WALEntry.newBuilder();
builder.setAssociatedCellCount(1);
WALKey.Builder keyBuilder = WALKey.newBuilder();
@@ -264,13 +388,10 @@ public class TestReplicationSink {
uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits());
keyBuilder.setClusterId(uuidBuilder.build());
keyBuilder.setTableName(ByteStringer.wrap(table.getName()));
- keyBuilder.setWriteTime(now);
+ keyBuilder.setWriteTime(System.currentTimeMillis());
keyBuilder.setEncodedRegionName(ByteStringer.wrap(HConstants.EMPTY_BYTE_ARRAY));
keyBuilder.setLogSequenceNumber(-1);
builder.setKey(keyBuilder.build());
- cells.add(kv);
-
- return builder.build();
+ return builder;
}
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index d50522c..a208120 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -19,13 +19,17 @@
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -51,6 +55,8 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -64,6 +70,7 @@ import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
@@ -108,6 +115,8 @@ public class TestReplicationSourceManager {
private static final byte[] f1 = Bytes.toBytes("f1");
+ private static final byte[] f2 = Bytes.toBytes("f2");
+
private static final TableName test =
TableName.valueOf("test");
@@ -161,10 +170,10 @@ public class TestReplicationSourceManager {
manager.addSource(slaveId);
htd = new HTableDescriptor(test);
- HColumnDescriptor col = new HColumnDescriptor("f1");
+ HColumnDescriptor col = new HColumnDescriptor(f1);
col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
htd.addFamily(col);
- col = new HColumnDescriptor("f2");
+ col = new HColumnDescriptor(f2);
col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
htd.addFamily(col);
@@ -416,6 +425,63 @@ public class TestReplicationSourceManager {
s0.abort("", null);
}
+ @Test
+ public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
+ // 1. Create wal key
+ WALKey logKey = new WALKey();
+ // 2. Get the bulk load wal edit event
+ WALEdit logEdit = getBulkLoadWALEdit();
+
+ // 3. Get the scopes for the key
+ Replication.scopeWALEdits(htd, logKey, logEdit, conf, manager);
+
+ // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled
+ assertNull("No bulk load entries scope should be added if bulk load replication is diabled.",
+ logKey.getScopes());
+ }
+
+ @Test
+ public void testBulkLoadWALEdits() throws Exception {
+ // 1. Create wal key
+ WALKey logKey = new WALKey();
+ // 2. Get the bulk load wal edit event
+ WALEdit logEdit = getBulkLoadWALEdit();
+ // 3. Enable bulk load hfile replication
+ Configuration bulkLoadConf = HBaseConfiguration.create(conf);
+ bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+
+ // 4. Get the scopes for the key
+ Replication.scopeWALEdits(htd, logKey, logEdit, bulkLoadConf, manager);
+
+ NavigableMap<byte[], Integer> scopes = logKey.getScopes();
+ // Assert family with replication scope global is present in the key scopes
+ assertTrue("This family scope is set to global, should be part of replication key scopes.",
+ scopes.containsKey(f1));
+ // Assert family with replication scope local is not present in the key scopes
+ assertFalse("This family scope is set to local, should not be part of replication key scopes",
+ scopes.containsKey(f2));
+ }
+
+ private WALEdit getBulkLoadWALEdit() {
+ // 1. Create store files for the families
+ Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
+ List<Path> p = new ArrayList<>(1);
+ p.add(new Path(Bytes.toString(f1)));
+ storeFiles.put(f1, p);
+
+ p = new ArrayList<>(1);
+ p.add(new Path(Bytes.toString(f2)));
+ storeFiles.put(f2, p);
+
+ // 2. Create bulk load descriptor
+ BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
+ ByteStringer.wrap(hri.getEncodedNameAsBytes()), storeFiles, 1);
+
+ // 3. create bulk load wal edit event
+ WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc);
+ return logEdit;
+ }
+
static class DummyNodeFailoverWorker extends Thread {
private SortedMap<String, SortedSet<String>> logZnodesMap;
Server server;
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java
new file mode 100644
index 0000000..a14c02b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class TestSourceFSConfigurationProvider implements SourceFSConfigurationProvider {
+ @Override
+ public Configuration getConf(Configuration sinkConf, String replicationClusterId)
+ throws IOException {
+ return sinkConf;
+ }
+}