You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2015/12/10 08:38:34 UTC
[1/3] hbase git commit: HBASE-13153 Bulk Loaded HFile Replication
(Ashish Singhi)
Repository: hbase
Updated Branches:
refs/heads/master 9647fee3f -> 26ac60b03
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a8cffba..9ff4b2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -45,8 +45,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
@@ -225,8 +227,16 @@ public class ReplicationSourceManager implements ReplicationListener {
* old region server wal queues
*/
protected void init() throws IOException, ReplicationException {
+ boolean replicationForBulkLoadDataEnabled =
+ conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
for (String id : this.replicationPeers.getPeerIds()) {
addSource(id);
+ if (replicationForBulkLoadDataEnabled) {
+ // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
+ // when a peer was added before replication for bulk loaded data was enabled.
+ this.replicationQueues.addPeerToHFileRefs(id);
+ }
}
List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
if (currentReplicators == null || currentReplicators.size() == 0) {
@@ -733,4 +743,15 @@ public class ReplicationSourceManager implements ReplicationListener {
}
return stats.toString();
}
+
+ public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+ throws ReplicationException {
+ for (ReplicationSourceInterface source : this.sources) {
+ source.addHFileRefs(tableName, family, files);
+ }
+ }
+
+ public void cleanUpHFileRefs(String peerId, List<String> files) {
+ this.replicationQueues.removeHFileRefs(peerId, files);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java
new file mode 100644
index 0000000..8271115
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Interface that defines how a region server in peer cluster will get source cluster file system
+ * configurations. User can configure their custom implementation implementing this interface by
+ * setting the value of their custom implementation's fully qualified class name to
+ * hbase.replication.source.fs.conf.provider property in RegionServer configuration. Default is
+ * {@link DefaultSourceFSConfigurationProvider}
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface SourceFSConfigurationProvider {
+
+ /**
+ * Returns the source cluster file system configuration for the given source cluster replication
+ * ID.
+ * @param sinkConf sink cluster configuration
+ * @param replicationClusterId unique ID which identifies the source cluster
+ * @return source cluster file system configuration
+ * @throws IOException for invalid directory or for a bad disk.
+ */
+ public Configuration getConf(Configuration sinkConf, String replicationClusterId)
+ throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
index becc9f3..3541ade 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
@@ -217,7 +217,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
familyPaths.add(new Pair(el.getFamily().toByteArray(),el.getPath()));
}
-
+
Token userToken = null;
if (userProvider.isHadoopSecurityEnabled()) {
userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
@@ -375,6 +375,14 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
Path p = new Path(srcPath);
Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
+
+ // In case of Replication for bulk load files, hfiles are already copied in staging directory
+ if (p.equals(stageP)) {
+ LOG.debug(p.getName()
+ + " is already available in staging directory. Skipping copy or rename.");
+ return stageP.toString();
+ }
+
if (srcFs == null) {
srcFs = FileSystem.get(p.toUri(), conf);
}
@@ -414,6 +422,14 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
Path p = new Path(srcPath);
Path stageP = new Path(stagingDir,
new Path(Bytes.toString(family), p.getName()));
+
+ // In case of Replication for bulk load files, hfiles are not renamed by end point during
+ // prepare stage, so no need of rename here again
+ if (p.equals(stageP)) {
+ LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");
+ return;
+ }
+
LOG.debug("Moving " + stageP + " back to " + p);
if(!fs.rename(stageP, p))
throw new IOException("Failed to move HFile: " + stageP + " to " + p);
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
new file mode 100644
index 0000000..87db386
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
+import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestReplicationHFileCleaner {
+ private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static Server server;
+ private static ReplicationQueues rq;
+ private static ReplicationPeers rp;
+ private static final String peerId = "TestReplicationHFileCleaner";
+ private static Configuration conf = TEST_UTIL.getConfiguration();
+ static FileSystem fs = null;
+ Path root;
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniZKCluster();
+ server = new DummyServer();
+ conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+ Replication.decorateMasterConfiguration(conf);
+ rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server);
+ rp.init();
+
+ rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
+ rq.init(server.getServerName().toString());
+ try {
+ fs = FileSystem.get(conf);
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ }
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniZKCluster();
+ }
+
+ @Before
+ public void setup() throws ReplicationException, IOException {
+ root = TEST_UTIL.getDataTestDirOnTestFS();
+ rp.addPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()), null);
+ }
+
+ @After
+ public void cleanup() throws ReplicationException {
+ try {
+ fs.delete(root, true);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete files recursively from path " + root);
+ }
+ rp.removePeer(peerId);
+ }
+
+ @Test
+ public void testIsFileDeletable() throws IOException, ReplicationException {
+ // 1. Create a file
+ Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs");
+ fs.createNewFile(file);
+ // 2. Assert file is successfully created
+ assertTrue("Test file not created!", fs.exists(file));
+ ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
+ cleaner.setConf(conf);
+ // 3. Assert that file as is should be deletable
+ assertTrue("Cleaner should allow to delete this file as there is no hfile reference node "
+ + "for it in the queue.",
+ cleaner.isFileDeletable(fs.getFileStatus(file)));
+
+ List<String> files = new ArrayList<String>(1);
+ files.add(file.getName());
+ // 4. Add the file to hfile-refs queue
+ rq.addHFileRefs(peerId, files);
+ // 5. Assert file should not be deletable
+ assertFalse("Cleaner should not allow to delete this file as there is a hfile reference node "
+ + "for it in the queue.",
+ cleaner.isFileDeletable(fs.getFileStatus(file)));
+ }
+
+ @Test
+ public void testGetDeletableFiles() throws Exception {
+ // 1. Create two files and assert that they do not exist
+ Path notDeletablefile = new Path(root, "testGetDeletableFiles_1");
+ fs.createNewFile(notDeletablefile);
+ assertTrue("Test file not created!", fs.exists(notDeletablefile));
+ Path deletablefile = new Path(root, "testGetDeletableFiles_2");
+ fs.createNewFile(deletablefile);
+ assertTrue("Test file not created!", fs.exists(deletablefile));
+
+ List<FileStatus> files = new ArrayList<FileStatus>(2);
+ FileStatus f = new FileStatus();
+ f.setPath(deletablefile);
+ files.add(f);
+ f = new FileStatus();
+ f.setPath(notDeletablefile);
+ files.add(f);
+
+ List<String> hfiles = new ArrayList<>(1);
+ hfiles.add(notDeletablefile.getName());
+ // 2. Add one file to hfile-refs queue
+ rq.addHFileRefs(peerId, hfiles);
+
+ ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
+ cleaner.setConf(conf);
+ Iterator<FileStatus> deletableFilesIterator = cleaner.getDeletableFiles(files).iterator();
+ int i = 0;
+ while (deletableFilesIterator.hasNext() && i < 2) {
+ i++;
+ }
+ // 5. Assert one file should not be deletable and it is present in the list returned
+ if (i > 2) {
+ fail("File " + notDeletablefile
+ + " should not be deletable as its hfile reference node is not added.");
+ }
+ assertTrue(deletableFilesIterator.next().getPath().equals(deletablefile));
+ }
+
+ /*
+ * Test for HBASE-14621. This test will not assert directly anything. Without the fix the test
+ * will end up in a infinite loop, so it will timeout.
+ */
+ @Test(timeout = 15000)
+ public void testForDifferntHFileRefsZnodeVersion() throws Exception {
+ // 1. Create a file
+ Path file = new Path(root, "testForDifferntHFileRefsZnodeVersion");
+ fs.createNewFile(file);
+ // 2. Assert file is successfully created
+ assertTrue("Test file not created!", fs.exists(file));
+ ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
+ cleaner.setConf(conf);
+
+ ReplicationQueuesClient replicationQueuesClient = Mockito.mock(ReplicationQueuesClient.class);
+ //Return different znode version for each call
+ Mockito.when(replicationQueuesClient.getHFileRefsNodeChangeVersion()).thenReturn(1, 2);
+
+ Class<? extends ReplicationHFileCleaner> cleanerClass = cleaner.getClass();
+ Field rqc = cleanerClass.getDeclaredField("rqc");
+ rqc.setAccessible(true);
+ rqc.set(cleaner, replicationQueuesClient);
+
+ cleaner.isFileDeletable(fs.getFileStatus(file));
+ }
+
+ static class DummyServer implements Server {
+
+ @Override
+ public Configuration getConfiguration() {
+ return TEST_UTIL.getConfiguration();
+ }
+
+ @Override
+ public ZooKeeperWatcher getZooKeeper() {
+ try {
+ return new ZooKeeperWatcher(getConfiguration(), "dummy server", this);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @Override
+ public CoordinatedStateManager getCoordinatedStateManager() {
+ return null;
+ }
+
+ @Override
+ public ClusterConnection getConnection() {
+ return null;
+ }
+
+ @Override
+ public MetaTableLocator getMetaTableLocator() {
+ return null;
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return ServerName.valueOf("regionserver,60020,000000");
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ }
+
+ @Override
+ public boolean isAborted() {
+ return false;
+ }
+
+ @Override
+ public void stop(String why) {
+ }
+
+ @Override
+ public boolean isStopped() {
+ return false;
+ }
+
+ @Override
+ public ChoreService getChoreService() {
+ return null;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index f463f76..abe484e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -19,12 +19,14 @@
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
+import java.util.List;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
@@ -89,4 +91,10 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
public String getStats() {
return "";
}
+
+ @Override
+ public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+ throws ReplicationException {
+ return;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 455a790..e919c24 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -19,15 +19,21 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -35,7 +41,9 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -48,12 +56,17 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
@@ -79,6 +92,7 @@ public class TestMasterReplication {
private static final TableName tableName = TableName.valueOf("test");
private static final byte[] famName = Bytes.toBytes("f");
+ private static final byte[] famName1 = Bytes.toBytes("f1");
private static final byte[] row = Bytes.toBytes("row");
private static final byte[] row1 = Bytes.toBytes("row1");
private static final byte[] row2 = Bytes.toBytes("row2");
@@ -103,7 +117,11 @@ public class TestMasterReplication {
baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
- HConstants.REPLICATION_ENABLE_DEFAULT);
+ HConstants.REPLICATION_ENABLE_DEFAULT);
+ baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+ baseConfiguration.set("hbase.replication.source.fs.conf.provider",
+ TestSourceFSConfigurationProvider.class.getCanonicalName());
+ baseConfiguration.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
baseConfiguration.setBoolean("dfs.support.append", true);
baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
baseConfiguration.setStrings(
@@ -114,6 +132,9 @@ public class TestMasterReplication {
HColumnDescriptor fam = new HColumnDescriptor(famName);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
table.addFamily(fam);
+ fam = new HColumnDescriptor(famName1);
+ fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+ table.addFamily(fam);
fam = new HColumnDescriptor(noRepfamName);
table.addFamily(fam);
}
@@ -130,14 +151,7 @@ public class TestMasterReplication {
int numClusters = 2;
Table[] htables = null;
try {
- startMiniClusters(numClusters);
- createTableOnClusters(table);
-
- htables = getHTablesOnClusters(tableName);
-
- // Test the replication scenarios of 0 -> 1 -> 0
- addPeer("1", 0, 1);
- addPeer("1", 1, 0);
+ htables = setUpClusterTablesAndPeers(numClusters);
int[] expectedCounts = new int[] { 2, 2 };
@@ -157,12 +171,64 @@ public class TestMasterReplication {
}
/**
- * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and
- * deleting rows to a table in each clusters and ensuring that the each of
- * these clusters get the appropriate mutations. It also tests the grouping
- * scenario where a cluster needs to replicate the edits originating from
- * itself and also the edits that it received using replication from a
- * different cluster. The scenario is explained in HBASE-9158
+ * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of
+ * HFiles to a table in each cluster, checking if it's replicated.
+ */
+ @Test(timeout = 300000)
+ public void testHFileCyclicReplication() throws Exception {
+ LOG.info("testHFileCyclicReplication");
+ int numClusters = 2;
+ Table[] htables = null;
+ try {
+ htables = setUpClusterTablesAndPeers(numClusters);
+
+ // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
+ // to cluster '1'.
+ byte[][][] hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+ new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
+ int numOfRows = 100;
+ int[] expectedCounts =
+ new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
+
+ loadAndValidateHFileReplication("testHFileCyclicReplication_01", 0, new int[] { 1 }, row,
+ famName, htables, hfileRanges, numOfRows, expectedCounts, true);
+
+ // Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated
+ // to cluster '0'.
+ hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
+ new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
+ numOfRows = 200;
+ int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
+ hfileRanges.length * numOfRows + expectedCounts[1] };
+
+ loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row,
+ famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
+
+ } finally {
+ close(htables);
+ shutDownMiniClusters();
+ }
+ }
+
+ private Table[] setUpClusterTablesAndPeers(int numClusters) throws Exception {
+ Table[] htables;
+ startMiniClusters(numClusters);
+ createTableOnClusters(table);
+
+ htables = getHTablesOnClusters(tableName);
+ // Test the replication scenarios of 0 -> 1 -> 0
+ addPeer("1", 0, 1);
+ addPeer("1", 1, 0);
+ return htables;
+ }
+
+ /**
+ * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and deleting rows to a
+ * table in each clusters and ensuring that the each of these clusters get the appropriate
+ * mutations. It also tests the grouping scenario where a cluster needs to replicate the edits
+ * originating from itself and also the edits that it received using replication from a different
+ * cluster. The scenario is explained in HBASE-9158
*/
@Test(timeout = 300000)
public void testCyclicReplication2() throws Exception {
@@ -213,6 +279,119 @@ public class TestMasterReplication {
}
/**
+ * It tests the multi slave hfile replication scenario involving 0 -> 1, 2. It does it by bulk
+ * loading a set of HFiles to a table in master cluster, checking if it's replicated in its peers.
+ */
+ @Test(timeout = 300000)
+ public void testHFileMultiSlaveReplication() throws Exception {
+ LOG.info("testHFileMultiSlaveReplication");
+ int numClusters = 3;
+ Table[] htables = null;
+ try {
+ startMiniClusters(numClusters);
+ createTableOnClusters(table);
+
+ // Add a slave, 0 -> 1
+ addPeer("1", 0, 1);
+
+ htables = getHTablesOnClusters(tableName);
+
+ // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
+ // to cluster '1'.
+ byte[][][] hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") },
+ new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, };
+ int numOfRows = 100;
+
+ int[] expectedCounts =
+ new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
+
+ loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row,
+ famName, htables, hfileRanges, numOfRows, expectedCounts, true);
+
+ // Validate data is not replicated to cluster '2'.
+ assertEquals(0, utilities[2].countRows(htables[2]));
+
+ rollWALAndWait(utilities[0], htables[0].getName(), row);
+
+ // Add one more slave, 0 -> 2
+ addPeer("2", 0, 2);
+
+ // Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated
+ // to cluster '1' and '2'. Previous data should be replicated to cluster '2'.
+ hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") },
+ new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, };
+ numOfRows = 200;
+
+ int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
+ hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows };
+
+ loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row,
+ famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
+
+ } finally {
+ close(htables);
+ shutDownMiniClusters();
+ }
+ }
+
+ /**
+ * It tests the bulk loaded hfile replication scenario to only explicitly specified table column
+ * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set
+ * only one CF data to replicate.
+ */
+ @Test(timeout = 300000)
+ public void testHFileReplicationForConfiguredTableCfs() throws Exception {
+ LOG.info("testHFileReplicationForConfiguredTableCfs");
+ int numClusters = 2;
+ Table[] htables = null;
+ try {
+ startMiniClusters(numClusters);
+ createTableOnClusters(table);
+
+ htables = getHTablesOnClusters(tableName);
+ // Test the replication scenarios only 'f' is configured for table data replication not 'f1'
+ addPeer("1", 0, 1, tableName.getNameAsString() + ":" + Bytes.toString(famName));
+
+ // Load 100 rows for each hfile range in cluster '0' for table CF 'f'
+ byte[][][] hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+ new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
+ int numOfRows = 100;
+ int[] expectedCounts =
+ new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
+
+ loadAndValidateHFileReplication("load_f", 0, new int[] { 1 }, row, famName, htables,
+ hfileRanges, numOfRows, expectedCounts, true);
+
+ // Load 100 rows for each hfile range in cluster '0' for table CF 'f1'
+ hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
+ new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
+ numOfRows = 100;
+
+ int[] newExpectedCounts =
+ new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] };
+
+ loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables,
+ hfileRanges, numOfRows, newExpectedCounts, false);
+
+ // Validate data replication for CF 'f1'
+
+ // Source cluster table should contain data for the families
+ wait(0, htables[0], hfileRanges.length * numOfRows + expectedCounts[0]);
+
+ // Sleep for enough time so that the data is still not replicated for the CF which is not
+ // configured for replication
+ Thread.sleep((NB_RETRIES / 2) * SLEEP_TIME);
+ // Peer cluster should have only configured CF data
+ wait(1, htables[1], expectedCounts[1]);
+ } finally {
+ close(htables);
+ shutDownMiniClusters();
+ }
+ }
+
+ /**
* Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1.
*/
@Test(timeout = 300000)
@@ -328,6 +507,17 @@ public class TestMasterReplication {
close(replicationAdmin);
}
}
+
+ private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
+ throws Exception {
+ ReplicationAdmin replicationAdmin = null;
+ try {
+ replicationAdmin = new ReplicationAdmin(configurations[masterClusterNumber]);
+ replicationAdmin.addPeer(id, utilities[slaveClusterNumber].getClusterKey(), tableCfs);
+ } finally {
+ close(replicationAdmin);
+ }
+ }
private void disablePeer(String id, int masterClusterNumber) throws Exception {
ReplicationAdmin replicationAdmin = null;
@@ -405,8 +595,56 @@ public class TestMasterReplication {
wait(row, target, false);
}
- private void wait(byte[] row, Table target, boolean isDeleted)
- throws Exception {
+ private void loadAndValidateHFileReplication(String testName, int masterNumber,
+ int[] slaveNumbers, byte[] row, byte[] fam, Table[] tables, byte[][][] hfileRanges,
+ int numOfRows, int[] expectedCounts, boolean toValidate) throws Exception {
+ HBaseTestingUtility util = utilities[masterNumber];
+
+ Path dir = util.getDataTestDirOnTestFS(testName);
+ FileSystem fs = util.getTestFileSystem();
+ dir = dir.makeQualified(fs);
+ Path familyDir = new Path(dir, Bytes.toString(fam));
+
+ int hfileIdx = 0;
+ for (byte[][] range : hfileRanges) {
+ byte[] from = range[0];
+ byte[] to = range[1];
+ HFileTestUtil.createHFile(util.getConfiguration(), fs,
+ new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
+ }
+
+ Table source = tables[masterNumber];
+ final TableName tableName = source.getName();
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
+ String[] args = { dir.toString(), tableName.toString() };
+ loader.run(args);
+
+ if (toValidate) {
+ for (int slaveClusterNumber : slaveNumbers) {
+ wait(slaveClusterNumber, tables[slaveClusterNumber], expectedCounts[slaveClusterNumber]);
+ }
+ }
+ }
+
+ private void wait(int slaveNumber, Table target, int expectedCount)
+ throws IOException, InterruptedException {
+ int count = 0;
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i == NB_RETRIES - 1) {
+ fail("Waited too much time for bulkloaded data replication. Current count=" + count
+ + ", expected count=" + expectedCount);
+ }
+ count = utilities[slaveNumber].countRows(target);
+ if (count != expectedCount) {
+ LOG.info("Waiting more time for bulkloaded data replication.");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+ }
+
+ private void wait(byte[] row, Table target, boolean isDeleted) throws Exception {
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES - 1) {
@@ -430,6 +668,47 @@ public class TestMasterReplication {
}
}
+ private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
+ final byte[] row) throws IOException {
+ final Admin admin = utility.getHBaseAdmin();
+ final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
+
+ // find the region that corresponds to the given row.
+ HRegion region = null;
+ for (HRegion candidate : cluster.getRegions(table)) {
+ if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
+ region = candidate;
+ break;
+ }
+ }
+ assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ // listen for successful log rolls
+ final WALActionsListener listener = new WALActionsListener.Base() {
+ @Override
+ public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
+ latch.countDown();
+ }
+ };
+ region.getWAL().registerWALActionsListener(listener);
+
+ // request a roll
+ admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(),
+ region.getRegionInfo().getRegionName()));
+
+ // wait
+ try {
+ latch.await();
+ } catch (InterruptedException exception) {
+ LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
+ "replication tests fail, it's probably because we should still be waiting.");
+ Thread.currentThread().interrupt();
+ }
+ region.getWAL().unregisterWALActionsListener(listener);
+ }
+
/**
* Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
* timestamp there is otherwise no way to count them.
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index 4823597..47d2880 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -658,7 +658,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
HRegionInfo hri = new HRegionInfo(htable1.getName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
- Replication.scopeWALEdits(htable1.getTableDescriptor(), new WALKey(), edit);
+ Replication.scopeWALEdits(htable1.getTableDescriptor(), new WALKey(), edit,
+ htable1.getConfiguration(), null);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 696c130..41c3240 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.*;
+import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.SortedSet;
@@ -160,6 +161,62 @@ public abstract class TestReplicationStateBasic {
}
@Test
+ public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
+ rp.init();
+ rq1.init(server1);
+ rqc.init();
+
+ List<String> files1 = new ArrayList<String>(3);
+ files1.add("file_1");
+ files1.add("file_2");
+ files1.add("file_3");
+ assertNull(rqc.getReplicableHFiles(ID_ONE));
+ assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
+ rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+ rq1.addHFileRefs(ID_ONE, files1);
+ assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
+ assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
+ List<String> files2 = new ArrayList<>(files1);
+ String removedString = files2.remove(0);
+ rq1.removeHFileRefs(ID_ONE, files2);
+ assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size());
+ files2 = new ArrayList<>(1);
+ files2.add(removedString);
+ rq1.removeHFileRefs(ID_ONE, files2);
+ assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size());
+ rp.removePeer(ID_ONE);
+ }
+
+ @Test
+ public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
+ rq1.init(server1);
+ rqc.init();
+
+ rp.init();
+ rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+ rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null);
+
+ List<String> files1 = new ArrayList<String>(3);
+ files1.add("file_1");
+ files1.add("file_2");
+ files1.add("file_3");
+ rq1.addHFileRefs(ID_ONE, files1);
+ rq1.addHFileRefs(ID_TWO, files1);
+ assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size());
+ assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
+ assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
+
+ rp.removePeer(ID_ONE);
+ assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
+ assertNull(rqc.getReplicableHFiles(ID_ONE));
+ assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
+
+ rp.removePeer(ID_TWO);
+ assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
+ assertNull(rqc.getReplicableHFiles(ID_TWO));
+ }
+
+ @Test
public void testReplicationPeers() throws Exception {
rp.init();
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index 4587c61..3b7402a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -64,6 +64,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
utility = new HBaseTestingUtility();
utility.startMiniZKCluster();
conf = utility.getConfiguration();
+ conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
index 13545b5..b36bb9a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
@@ -52,15 +52,15 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
private static final TableName t1_su = TableName.valueOf("t1_syncup");
private static final TableName t2_su = TableName.valueOf("t2_syncup");
- private static final byte[] famName = Bytes.toBytes("cf1");
+ protected static final byte[] famName = Bytes.toBytes("cf1");
private static final byte[] qualName = Bytes.toBytes("q1");
- private static final byte[] noRepfamName = Bytes.toBytes("norep");
+ protected static final byte[] noRepfamName = Bytes.toBytes("norep");
private HTableDescriptor t1_syncupSource, t1_syncupTarget;
private HTableDescriptor t2_syncupSource, t2_syncupTarget;
- private Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
+ protected Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
@Before
public void setUp() throws Exception {
@@ -179,7 +179,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
}
- private void setupReplication() throws Exception {
+ protected void setupReplication() throws Exception {
ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
@@ -418,7 +418,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
}
}
- private void syncUp(HBaseTestingUtility ut) throws Exception {
+ protected void syncUp(HBaseTestingUtility ut) throws Exception {
ReplicationSyncUp.setConfigure(ut.getConfiguration());
String[] arguments = new String[] { null };
new ReplicationSyncUp().run(arguments);
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
new file mode 100644
index 0000000..f54c632
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpTool {
+
+ private static final Log LOG = LogFactory
+ .getLog(TestReplicationSyncUpToolWithBulkLoadedData.class);
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+ conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
+ conf1.set("hbase.replication.source.fs.conf.provider",
+ TestSourceFSConfigurationProvider.class.getCanonicalName());
+ String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+ if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) {
+ classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint";
+ conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes);
+ }
+
+ TestReplicationBase.setUpBeforeClass();
+ }
+
+ @Override
+ public void testSyncUpTool() throws Exception {
+ /**
+ * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
+ * 'cf1' : replicated 'norep': not replicated
+ */
+ setupReplication();
+
+ /**
+ * Prepare 16 random hfile ranges required for creating hfiles
+ */
+ Iterator<String> randomHFileRangeListIterator = null;
+ Set<String> randomHFileRanges = new HashSet<String>(16);
+ for (int i = 0; i < 16; i++) {
+ randomHFileRanges.add(UUID.randomUUID().toString());
+ }
+ List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
+ Collections.sort(randomHFileRangeList);
+ randomHFileRangeListIterator = randomHFileRangeList.iterator();
+
+ /**
+ * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows
+ * into cf1, and 3 rows into norep verify correctly replicated to slave
+ */
+ loadAndReplicateHFiles(true, randomHFileRangeListIterator);
+
+ /**
+ * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load
+ * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and
+ * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
+ * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step
+ * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not
+ * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to
+ * Slave
+ */
+ mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
+
+ }
+
+ private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
+ throws Exception {
+ LOG.debug("mimicSyncUpAfterBulkLoad");
+ utility2.shutdownMiniHBaseCluster();
+
+ loadAndReplicateHFiles(false, randomHFileRangeListIterator);
+
+ int rowCount_ht1Source = utility1.countRows(ht1Source);
+ assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206,
+ rowCount_ht1Source);
+
+ int rowCount_ht2Source = utility1.countRows(ht2Source);
+ assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
+ rowCount_ht2Source);
+
+ utility1.shutdownMiniHBaseCluster();
+ utility2.restartHBaseCluster(1);
+
+ Thread.sleep(SLEEP_TIME);
+
+ // Before sync up
+ int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
+ int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+ assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
+ assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
+
+ // Run sync up tool
+ syncUp(utility1);
+
+ // After syun up
+ for (int i = 0; i < NB_RETRIES; i++) {
+ syncUp(utility1);
+ rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
+ rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+ if (i == NB_RETRIES - 1) {
+ if (rowCount_ht1TargetAtPeer1 != 200 || rowCount_ht2TargetAtPeer1 != 400) {
+ // syncUP still failed. Let's look at the source in case anything wrong there
+ utility1.restartHBaseCluster(1);
+ rowCount_ht1Source = utility1.countRows(ht1Source);
+ LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
+ rowCount_ht2Source = utility1.countRows(ht2Source);
+ LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source);
+ }
+ assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
+ rowCount_ht1TargetAtPeer1);
+ assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
+ rowCount_ht2TargetAtPeer1);
+ }
+ if (rowCount_ht1TargetAtPeer1 == 200 && rowCount_ht2TargetAtPeer1 == 400) {
+ LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i);
+ break;
+ } else {
+ LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
+ + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
+ + rowCount_ht2TargetAtPeer1);
+ }
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+
+ private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,
+ Iterator<String> randomHFileRangeListIterator) throws Exception {
+ LOG.debug("loadAndReplicateHFiles");
+
+ // Load 100 + 3 hfiles to t1_syncup.
+ byte[][][] hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges,
+ 100);
+
+ hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source,
+ hfileRanges, 3);
+
+ // Load 200 + 3 hfiles to t2_syncup.
+ hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges,
+ 200);
+
+ hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht2Source,
+ hfileRanges, 3);
+
+ if (verifyReplicationOnSlave) {
+ // ensure replication completed
+ wait(ht1TargetAtPeer1, utility1.countRows(ht1Source) - 3,
+ "t1_syncup has 103 rows on source, and 100 on slave1");
+
+ wait(ht2TargetAtPeer1, utility1.countRows(ht2Source) - 3,
+ "t2_syncup has 203 rows on source, and 200 on slave1");
+ }
+ }
+
+ private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
+ Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
+ Path dir = utility1.getDataTestDirOnTestFS(testName);
+ FileSystem fs = utility1.getTestFileSystem();
+ dir = dir.makeQualified(fs);
+ Path familyDir = new Path(dir, Bytes.toString(fam));
+
+ int hfileIdx = 0;
+ for (byte[][] range : hfileRanges) {
+ byte[] from = range[0];
+ byte[] to = range[1];
+ HFileTestUtil.createHFile(utility1.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ + hfileIdx++), fam, row, from, to, numOfRows);
+ }
+
+ final TableName tableName = source.getName();
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration());
+ String[] args = { dir.toString(), tableName.toString() };
+ loader.run(args);
+ }
+
+ private void wait(Table target, int expectedCount, String msg) throws IOException,
+ InterruptedException {
+ for (int i = 0; i < NB_RETRIES; i++) {
+ int rowCount_ht2TargetAtPeer1 = utility2.countRows(target);
+ if (i == NB_RETRIES - 1) {
+ assertEquals(msg, expectedCount, rowCount_ht2TargetAtPeer1);
+ }
+ if (expectedCount == rowCount_ht2TargetAtPeer1) {
+ break;
+ }
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
index b87e7ef..f08d2bf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
@@ -21,32 +21,52 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.security.SecureRandom;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -58,21 +78,18 @@ public class TestReplicationSink {
private static final Log LOG = LogFactory.getLog(TestReplicationSink.class);
private static final int BATCH_SIZE = 10;
- private final static HBaseTestingUtility TEST_UTIL =
- new HBaseTestingUtility();
+ protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static ReplicationSink SINK;
+ protected static ReplicationSink SINK;
- private static final TableName TABLE_NAME1 =
- TableName.valueOf("table1");
- private static final TableName TABLE_NAME2 =
- TableName.valueOf("table2");
+ protected static final TableName TABLE_NAME1 = TableName.valueOf("table1");
+ protected static final TableName TABLE_NAME2 = TableName.valueOf("table2");
- private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
- private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
+ protected static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
+ protected static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
- private static Table table1;
- private static Stoppable STOPPABLE = new Stoppable() {
+ protected static Table table1;
+ protected static Stoppable STOPPABLE = new Stoppable() {
final AtomicBoolean stop = new AtomicBoolean(false);
@Override
@@ -85,10 +102,13 @@ public class TestReplicationSink {
LOG.info("STOPPING BECAUSE: " + why);
this.stop.set(true);
}
-
+
};
- private static Table table2;
+ protected static Table table2;
+ protected static String baseNamespaceDir;
+ protected static String hfileArchiveDir;
+ protected static String replicationClusterId;
/**
* @throws java.lang.Exception
@@ -98,11 +118,18 @@ public class TestReplicationSink {
TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY,
HConstants.REPLICATION_ENABLE_DEFAULT);
+ TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
+ TestSourceFSConfigurationProvider.class.getCanonicalName());
+
TEST_UTIL.startMiniCluster(3);
SINK =
new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
+ Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
+ baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)).toString();
+ hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY)).toString();
+ replicationClusterId = "12345";
}
/**
@@ -134,7 +161,8 @@ public class TestReplicationSink {
for(int i = 0; i < BATCH_SIZE; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
- SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Scan scan = new Scan();
ResultScanner scanRes = table1.getScanner(scan);
assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
@@ -151,7 +179,8 @@ public class TestReplicationSink {
for(int i = 0; i < BATCH_SIZE/2; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
- SINK.replicateEntries(entries, CellUtil.createCellScanner(cells));
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
+ baseNamespaceDir, hfileArchiveDir);
entries = new ArrayList<WALEntry>(BATCH_SIZE);
cells = new ArrayList<Cell>();
@@ -160,7 +189,8 @@ public class TestReplicationSink {
i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn, cells));
}
- SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Scan scan = new Scan();
ResultScanner scanRes = table1.getScanner(scan);
assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
@@ -179,7 +209,8 @@ public class TestReplicationSink {
i, KeyValue.Type.Put, cells));
}
- SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Scan scan = new Scan();
ResultScanner scanRes = table2.getScanner(scan);
for(Result res : scanRes) {
@@ -198,14 +229,16 @@ public class TestReplicationSink {
for(int i = 0; i < 3; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
- SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
entries = new ArrayList<WALEntry>(3);
cells = new ArrayList<Cell>();
entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells));
entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells));
- SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Scan scan = new Scan();
ResultScanner scanRes = table1.getScanner(scan);
@@ -228,12 +261,96 @@ public class TestReplicationSink {
for(int i = 3; i < 5; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
- SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Get get = new Get(Bytes.toBytes(1));
Result res = table1.get(get);
assertEquals(0, res.size());
}
+ /**
+ * Test replicateEntries with a bulk load entry for 25 HFiles
+ */
+ @Test
+ public void testReplicateEntriesForHFiles() throws Exception {
+ Path dir = TEST_UTIL.getDataTestDirOnTestFS("testReplicateEntries");
+ Path familyDir = new Path(dir, Bytes.toString(FAM_NAME1));
+ int numRows = 10;
+
+ List<Path> p = new ArrayList<>(1);
+
+ // 1. Generate 25 hfile ranges
+ Random rng = new SecureRandom();
+ Set<Integer> numbers = new HashSet<>();
+ while (numbers.size() < 50) {
+ numbers.add(rng.nextInt(1000));
+ }
+ List<Integer> numberList = new ArrayList<>(numbers);
+ Collections.sort(numberList);
+
+ // 2. Create 25 hfiles
+ Configuration conf = TEST_UTIL.getConfiguration();
+ FileSystem fs = dir.getFileSystem(conf);
+ Iterator<Integer> numbersItr = numberList.iterator();
+ for (int i = 0; i < 25; i++) {
+ Path hfilePath = new Path(familyDir, "hfile_" + i);
+ HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1,
+ Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows);
+ p.add(hfilePath);
+ }
+
+ // 3. Create a BulkLoadDescriptor and a WALEdit
+ Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
+ storeFiles.put(FAM_NAME1, p);
+ WALEdit edit = null;
+ WALProtos.BulkLoadDescriptor loadDescriptor = null;
+
+ try (Connection c = ConnectionFactory.createConnection(conf);
+ RegionLocator l = c.getRegionLocator(TABLE_NAME1)) {
+ HRegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegionInfo();
+ loadDescriptor =
+ ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1,
+ ByteStringer.wrap(regionInfo.getEncodedNameAsBytes()), storeFiles, 1);
+ edit = WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor);
+ }
+ List<WALEntry> entries = new ArrayList<WALEntry>(1);
+
+ // 4. Create a WALEntryBuilder
+ WALEntry.Builder builder = createWALEntryBuilder(TABLE_NAME1);
+
+ // 5. Copy the hfile to the path as it is in reality
+ for (int i = 0; i < 25; i++) {
+ String pathToHfileFromNS =
+ new StringBuilder(100).append(TABLE_NAME1.getNamespaceAsString()).append(Path.SEPARATOR)
+ .append(Bytes.toString(TABLE_NAME1.getName())).append(Path.SEPARATOR)
+ .append(Bytes.toString(loadDescriptor.getEncodedRegionName().toByteArray()))
+ .append(Path.SEPARATOR).append(Bytes.toString(FAM_NAME1)).append(Path.SEPARATOR)
+ .append("hfile_" + i).toString();
+ String dst = baseNamespaceDir + Path.SEPARATOR + pathToHfileFromNS;
+
+ FileUtil.copy(fs, p.get(0), fs, new Path(dst), false, conf);
+ }
+
+ entries.add(builder.build());
+ ResultScanner scanRes = null;
+ try {
+ Scan scan = new Scan();
+ scanRes = table1.getScanner(scan);
+ // 6. Assert no existing data in table
+ assertEquals(0, scanRes.next(numRows).length);
+ // 7. Replicate the bulk loaded entry
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(edit.getCells().iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+ scanRes = table1.getScanner(scan);
+ // 8. Assert data is replicated
+ assertEquals(numRows, scanRes.next(numRows).length);
+ } finally {
+ if (scanRes != null) {
+ scanRes.close();
+ }
+ }
+ }
+
private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List<Cell> cells) {
byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
byte[] rowBytes = Bytes.toBytes(row);
@@ -256,6 +373,13 @@ public class TestReplicationSink {
kv = new KeyValue(rowBytes, fam, null,
now, KeyValue.Type.DeleteFamily);
}
+ WALEntry.Builder builder = createWALEntryBuilder(table);
+ cells.add(kv);
+
+ return builder.build();
+ }
+
+ private WALEntry.Builder createWALEntryBuilder(TableName table) {
WALEntry.Builder builder = WALEntry.newBuilder();
builder.setAssociatedCellCount(1);
WALKey.Builder keyBuilder = WALKey.newBuilder();
@@ -264,13 +388,10 @@ public class TestReplicationSink {
uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits());
keyBuilder.setClusterId(uuidBuilder.build());
keyBuilder.setTableName(ByteStringer.wrap(table.getName()));
- keyBuilder.setWriteTime(now);
+ keyBuilder.setWriteTime(System.currentTimeMillis());
keyBuilder.setEncodedRegionName(ByteStringer.wrap(HConstants.EMPTY_BYTE_ARRAY));
keyBuilder.setLogSequenceNumber(-1);
builder.setKey(keyBuilder.build());
- cells.add(kv);
-
- return builder.build();
+ return builder;
}
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index d50522c..a208120 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -19,13 +19,17 @@
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -51,6 +55,8 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -64,6 +70,7 @@ import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
@@ -108,6 +115,8 @@ public class TestReplicationSourceManager {
private static final byte[] f1 = Bytes.toBytes("f1");
+ private static final byte[] f2 = Bytes.toBytes("f2");
+
private static final TableName test =
TableName.valueOf("test");
@@ -161,10 +170,10 @@ public class TestReplicationSourceManager {
manager.addSource(slaveId);
htd = new HTableDescriptor(test);
- HColumnDescriptor col = new HColumnDescriptor("f1");
+ HColumnDescriptor col = new HColumnDescriptor(f1);
col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
htd.addFamily(col);
- col = new HColumnDescriptor("f2");
+ col = new HColumnDescriptor(f2);
col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
htd.addFamily(col);
@@ -416,6 +425,63 @@ public class TestReplicationSourceManager {
s0.abort("", null);
}
+ @Test
+ public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
+ // 1. Create wal key
+ WALKey logKey = new WALKey();
+ // 2. Get the bulk load wal edit event
+ WALEdit logEdit = getBulkLoadWALEdit();
+
+ // 3. Get the scopes for the key
+ Replication.scopeWALEdits(htd, logKey, logEdit, conf, manager);
+
+ // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled
+ assertNull("No bulk load entries scope should be added if bulk load replication is diabled.",
+ logKey.getScopes());
+ }
+
+ @Test
+ public void testBulkLoadWALEdits() throws Exception {
+ // 1. Create wal key
+ WALKey logKey = new WALKey();
+ // 2. Get the bulk load wal edit event
+ WALEdit logEdit = getBulkLoadWALEdit();
+ // 3. Enable bulk load hfile replication
+ Configuration bulkLoadConf = HBaseConfiguration.create(conf);
+ bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+
+ // 4. Get the scopes for the key
+ Replication.scopeWALEdits(htd, logKey, logEdit, bulkLoadConf, manager);
+
+ NavigableMap<byte[], Integer> scopes = logKey.getScopes();
+ // Assert family with replication scope global is present in the key scopes
+ assertTrue("This family scope is set to global, should be part of replication key scopes.",
+ scopes.containsKey(f1));
+ // Assert family with replication scope local is not present in the key scopes
+ assertFalse("This family scope is set to local, should not be part of replication key scopes",
+ scopes.containsKey(f2));
+ }
+
+ private WALEdit getBulkLoadWALEdit() {
+ // 1. Create store files for the families
+ Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
+ List<Path> p = new ArrayList<>(1);
+ p.add(new Path(Bytes.toString(f1)));
+ storeFiles.put(f1, p);
+
+ p = new ArrayList<>(1);
+ p.add(new Path(Bytes.toString(f2)));
+ storeFiles.put(f2, p);
+
+ // 2. Create bulk load descriptor
+ BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
+ ByteStringer.wrap(hri.getEncodedNameAsBytes()), storeFiles, 1);
+
+ // 3. create bulk load wal edit event
+ WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc);
+ return logEdit;
+ }
+
static class DummyNodeFailoverWorker extends Thread {
private SortedMap<String, SortedSet<String>> logZnodesMap;
Server server;
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java
new file mode 100644
index 0000000..a14c02b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class TestSourceFSConfigurationProvider implements SourceFSConfigurationProvider {
+ @Override
+ public Configuration getConf(Configuration sinkConf, String replicationClusterId)
+ throws IOException {
+ return sinkConf;
+ }
+}
[3/3] hbase git commit: HBASE-13153 Bulk Loaded HFile Replication
(Ashish Singhi)
Posted by ra...@apache.org.
HBASE-13153 Bulk Loaded HFile Replication (Ashish Singhi)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/26ac60b0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/26ac60b0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/26ac60b0
Branch: refs/heads/master
Commit: 26ac60b03f80c9215103a02db783341e67037753
Parents: 9647fee
Author: ramkrishna <ra...@gmail.com>
Authored: Thu Dec 10 13:07:46 2015 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Thu Dec 10 13:07:46 2015 +0530
----------------------------------------------------------------------
.../hbase/replication/ReplicationPeers.java | 2 +-
.../replication/ReplicationPeersZKImpl.java | 26 +-
.../hbase/replication/ReplicationQueues.java | 25 +-
.../replication/ReplicationQueuesClient.java | 25 +-
.../ReplicationQueuesClientZKImpl.java | 37 ++
.../replication/ReplicationQueuesZKImpl.java | 70 +++
.../replication/ReplicationStateZKBase.java | 14 +-
.../apache/hadoop/hbase/zookeeper/ZKUtil.java | 24 +-
.../org/apache/hadoop/hbase/HConstants.java | 16 +-
.../MetricsReplicationSinkSource.java | 2 +
.../MetricsReplicationSourceSource.java | 6 +
.../MetricsReplicationGlobalSourceSource.java | 21 +
.../MetricsReplicationSinkSourceImpl.java | 7 +
.../MetricsReplicationSourceSourceImpl.java | 28 +
.../hbase/protobuf/generated/AdminProtos.java | 602 +++++++++++++++++--
hbase-protocol/src/main/protobuf/Admin.proto | 3 +
.../hbase/mapreduce/LoadIncrementalHFiles.java | 152 +++--
.../hbase/protobuf/ReplicationProtbufUtil.java | 46 +-
.../hbase/regionserver/RSRpcServices.java | 4 +-
.../regionserver/ReplicationSinkService.java | 8 +-
.../regionserver/wal/WALActionsListener.java | 19 +-
.../hbase/replication/ScopeWALEntryFilter.java | 72 ++-
.../replication/TableCfWALEntryFilter.java | 76 ++-
.../master/ReplicationHFileCleaner.java | 193 ++++++
.../DefaultSourceFSConfigurationProvider.java | 78 +++
.../HBaseInterClusterReplicationEndpoint.java | 32 +-
.../regionserver/HFileReplicator.java | 393 ++++++++++++
.../replication/regionserver/MetricsSink.java | 13 +-
.../replication/regionserver/MetricsSource.java | 31 +
.../RegionReplicaReplicationEndpoint.java | 4 +-
.../replication/regionserver/Replication.java | 133 +++-
.../regionserver/ReplicationSink.java | 200 +++++-
.../regionserver/ReplicationSource.java | 92 ++-
.../ReplicationSourceInterface.java | 13 +
.../regionserver/ReplicationSourceManager.java | 21 +
.../SourceFSConfigurationProvider.java | 40 ++
.../security/access/SecureBulkLoadEndpoint.java | 18 +-
.../cleaner/TestReplicationHFileCleaner.java | 264 ++++++++
.../replication/ReplicationSourceDummy.java | 8 +
.../replication/TestMasterReplication.java | 313 +++++++++-
.../replication/TestReplicationSmallTests.java | 3 +-
.../replication/TestReplicationStateBasic.java | 57 ++
.../replication/TestReplicationStateZKImpl.java | 1 +
.../replication/TestReplicationSyncUpTool.java | 10 +-
...ReplicationSyncUpToolWithBulkLoadedData.java | 235 ++++++++
.../regionserver/TestReplicationSink.java | 179 +++++-
.../TestReplicationSourceManager.java | 70 ++-
.../TestSourceFSConfigurationProvider.java | 25 +
48 files changed, 3444 insertions(+), 267 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 8e80e06..8bf21d5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -50,7 +50,7 @@ public interface ReplicationPeers {
* @param peerId a short that identifies the cluster
* @param peerConfig configuration for the replication slave cluster
* @param tableCFs the table and column-family list which will be replicated for this peer or null
- * for all table and column families
+ * for all table and column families
*/
void addPeer(String peerId, ReplicationPeerConfig peerConfig, String tableCFs)
throws ReplicationException;
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 63f9ac3..fd10b66 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
import com.google.protobuf.ByteString;
@@ -120,8 +121,21 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
checkQueuesDeleted(id);
-
+
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+
+ // If only bulk load hfile replication is enabled then add peerId node to hfile-refs node
+ if (replicationForBulkLoadEnabled) {
+ try {
+ String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
+ LOG.info("Adding peer " + peerId + " to hfile reference queue.");
+ ZKUtil.createWithParents(this.zookeeper, peerId);
+ } catch (KeeperException e) {
+ throw new ReplicationException("Failed to add peer with id=" + id
+ + ", node under hfile references node.", e);
+ }
+ }
+
List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
toByteArray(peerConfig));
@@ -151,6 +165,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
+ " because that id does not exist.");
}
ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+ // Delete peerId node from hfile-refs node irrespective of whether bulk loaded hfile
+ // replication is enabled or not
+
+ String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
+ try {
+ LOG.info("Removing peer " + peerId + " from hfile reference queue.");
+ ZKUtil.deleteNodeRecursively(this.zookeeper, peerId);
+ } catch (NoNodeException e) {
+ LOG.info("Did not find node " + peerId + " to delete.", e);
+ }
} catch (KeeperException e) {
throw new ReplicationException("Could not remove peer with id=" + id, e);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index 3dbbc33..0d47a88 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -26,7 +26,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* This provides an interface for maintaining a region server's replication queues. These queues
- * keep track of the WALs that still need to be replicated to remote clusters.
+ * keep track of the WALs and HFile references (if hbase.replication.bulkload.enabled is enabled)
+ * that still need to be replicated to remote clusters.
*/
@InterfaceAudience.Private
public interface ReplicationQueues {
@@ -113,4 +114,26 @@ public interface ReplicationQueues {
* @return if this is this rs's znode
*/
boolean isThisOurZnode(String znode);
+
+ /**
+ * Add a peer to hfile reference queue if peer does not exist.
+ * @param peerId peer cluster id to be added
+ * @throws ReplicationException if fails to add a peer id to hfile reference queue
+ */
+ void addPeerToHFileRefs(String peerId) throws ReplicationException;
+
+ /**
+ * Add new hfile references to the queue.
+ * @param peerId peer cluster id to which the hfiles need to be replicated
+ * @param files list of hfile references to be added
+ * @throws ReplicationException if fails to add a hfile reference
+ */
+ void addHFileRefs(String peerId, List<String> files) throws ReplicationException;
+
+ /**
+ * Remove hfile references from the queue.
+ * @param peerId peer cluster id from which this hfile references needs to be removed
+ * @param files list of hfile references to be removed
+ */
+ void removeHFileRefs(String peerId, List<String> files);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
index 5b3e541..7fa3bbb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
@@ -25,7 +25,8 @@ import org.apache.zookeeper.KeeperException;
/**
* This provides an interface for clients of replication to view replication queues. These queues
- * keep track of the WALs that still need to be replicated to remote clusters.
+ * keep track of the sources(WALs/HFile references) that still need to be replicated to remote
+ * clusters.
*/
@InterfaceAudience.Private
public interface ReplicationQueuesClient {
@@ -65,4 +66,26 @@ public interface ReplicationQueuesClient {
* @return cversion of replication rs node
*/
int getQueuesZNodeCversion() throws KeeperException;
+
+ /**
+ * Get the change version number of replication hfile references node. This can be used as
+ * optimistic locking to get a consistent snapshot of the replication queues of hfile references.
+ * @return change version number of hfile references node
+ */
+ int getHFileRefsNodeChangeVersion() throws KeeperException;
+
+ /**
+ * Get list of all peers from hfile reference queue.
+ * @return a list of peer ids
+ * @throws KeeperException zookeeper exception
+ */
+ List<String> getAllPeersFromHFileRefsQueue() throws KeeperException;
+
+ /**
+ * Get a list of all hfile references in the given peer.
+ * @param peerId a String that identifies the peer
+ * @return a list of hfile references, null if not found any
+ * @throws KeeperException zookeeper exception
+ */
+ List<String> getReplicableHFiles(String peerId) throws KeeperException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index e1a6a49..cc407e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -84,4 +84,41 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
throw e;
}
}
+
+ @Override
+ public int getHFileRefsNodeChangeVersion() throws KeeperException {
+ Stat stat = new Stat();
+ try {
+ ZKUtil.getDataNoWatch(this.zookeeper, this.hfileRefsZNode, stat);
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed to get stat of replication hfile references node.", e);
+ throw e;
+ }
+ return stat.getCversion();
+ }
+
+ @Override
+ public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException {
+ List<String> result = null;
+ try {
+ result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.hfileRefsZNode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed to get list of all peers in hfile references node.", e);
+ throw e;
+ }
+ return result;
+ }
+
+ @Override
+ public List<String> getReplicableHFiles(String peerId) throws KeeperException {
+ String znode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+ List<String> result = null;
+ try {
+ result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed to get list of hfile references for peerId=" + peerId, e);
+ throw e;
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 97763e2..43dd412 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -84,6 +84,15 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
} catch (KeeperException e) {
throw new ReplicationException("Could not initialize replication queues.", e);
}
+ // If only bulk load hfile replication is enabled then create the hfile-refs znode
+ if (replicationForBulkLoadEnabled) {
+ try {
+ ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
+ } catch (KeeperException e) {
+ throw new ReplicationException("Could not initialize hfile references replication queue.",
+ e);
+ }
+ }
}
@Override
@@ -431,4 +440,65 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
+
+ @Override
+ public void addHFileRefs(String peerId, List<String> files) throws ReplicationException {
+ String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+ boolean debugEnabled = LOG.isDebugEnabled();
+ if (debugEnabled) {
+ LOG.debug("Adding hfile references " + files + " in queue " + peerZnode);
+ }
+ List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
+ int size = files.size();
+ for (int i = 0; i < size; i++) {
+ listOfOps.add(ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i)),
+ HConstants.EMPTY_BYTE_ARRAY));
+ }
+ if (debugEnabled) {
+ LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode
+ + " is " + listOfOps.size());
+ }
+ try {
+ ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
+ } catch (KeeperException e) {
+ throw new ReplicationException("Failed to create hfile reference znode=" + e.getPath(), e);
+ }
+ }
+
+ @Override
+ public void removeHFileRefs(String peerId, List<String> files) {
+ String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+ boolean debugEnabled = LOG.isDebugEnabled();
+ if (debugEnabled) {
+ LOG.debug("Removing hfile references " + files + " from queue " + peerZnode);
+ }
+ List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
+ int size = files.size();
+ for (int i = 0; i < size; i++) {
+ listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i))));
+ }
+ if (debugEnabled) {
+ LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode
+ + " is " + listOfOps.size());
+ }
+ try {
+ ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
+ } catch (KeeperException e) {
+ LOG.error("Failed to remove hfile reference znode=" + e.getPath(), e);
+ }
+ }
+
+ @Override
+ public void addPeerToHFileRefs(String peerId) throws ReplicationException {
+ String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+ try {
+ if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
+ LOG.info("Adding peer " + peerId + " to hfile reference queue.");
+ ZKUtil.createWithParents(this.zookeeper, peerZnode);
+ }
+ } catch (KeeperException e) {
+ throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.",
+ e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
index 4fbac0f..762167f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
@@ -20,9 +20,10 @@ package org.apache.hadoop.hbase.replication;
import java.util.List;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
@@ -48,32 +49,43 @@ public abstract class ReplicationStateZKBase {
protected final String peersZNode;
/** The name of the znode that contains all replication queues */
protected final String queuesZNode;
+ /** The name of the znode that contains queues of hfile references to be replicated */
+ protected final String hfileRefsZNode;
/** The cluster key of the local cluster */
protected final String ourClusterKey;
protected final ZooKeeperWatcher zookeeper;
protected final Configuration conf;
protected final Abortable abortable;
+ protected final boolean replicationForBulkLoadEnabled;
// Public for testing
public static final byte[] ENABLED_ZNODE_BYTES =
toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
public static final byte[] DISABLED_ZNODE_BYTES =
toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
+ public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
+ "zookeeper.znode.replication.hfile.refs";
+ public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf,
Abortable abortable) {
this.zookeeper = zookeeper;
this.conf = conf;
this.abortable = abortable;
+ this.replicationForBulkLoadEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
+ String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
+ ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);
+ this.hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName);
}
public List<String> getListOfReplicators() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index c268268..9e01d09 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
@@ -884,7 +885,7 @@ public class ZKUtil {
JaasConfiguration.SERVER_KEYTAB_KERBEROS_CONFIG_NAME) == null
&& conf.get(HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL) == null
&& conf.get(HConstants.ZK_SERVER_KERBEROS_PRINCIPAL) == null) {
-
+
return false;
}
} catch(Exception e) {
@@ -1797,6 +1798,27 @@ public class ZKUtil {
} else if (child.equals(zkw.getConfiguration().
get("zookeeper.znode.replication.rs", "rs"))) {
appendRSZnodes(zkw, znode, sb);
+ } else if (child.equals(zkw.getConfiguration().get(
+ ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
+ ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT))) {
+ appendHFileRefsZnodes(zkw, znode, sb);
+ }
+ }
+ }
+
+ private static void appendHFileRefsZnodes(ZooKeeperWatcher zkw, String hfileRefsZnode,
+ StringBuilder sb) throws KeeperException {
+ sb.append("\n").append(hfileRefsZnode).append(": ");
+ for (String peerIdZnode : ZKUtil.listChildrenNoWatch(zkw, hfileRefsZnode)) {
+ String znodeToProcess = ZKUtil.joinZNode(hfileRefsZnode, peerIdZnode);
+ sb.append("\n").append(znodeToProcess).append(": ");
+ List<String> peerHFileRefsZnodes = ZKUtil.listChildrenNoWatch(zkw, znodeToProcess);
+ int size = peerHFileRefsZnodes.size();
+ for (int i = 0; i < size; i++) {
+ sb.append(peerHFileRefsZnodes.get(i));
+ if (i != size - 1) {
+ sb.append(", ");
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index ac57514..6fafad3 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -850,6 +850,18 @@ public final class HConstants {
REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service";
public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT =
"org.apache.hadoop.hbase.replication.regionserver.Replication";
+ public static final String REPLICATION_BULKLOAD_ENABLE_KEY = "hbase.replication.bulkload.enabled";
+ public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
+ /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
+ public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id";
+ /**
+ * Directory where the source cluster file system client configuration are placed which is used by
+ * sink cluster to copy HFiles from source cluster file system
+ */
+ public static final String REPLICATION_CONF_DIR = "hbase.replication.conf.dir";
+
+ /** Maximum time to retry for a failed bulk load request */
+ public static final String BULKLOAD_MAX_RETRIES_NUMBER = "hbase.bulkload.retries.number";
/** HBCK special code name used as server name when manipulating ZK nodes */
public static final String HBCK_CODE_NAME = "HBCKServerName";
@@ -1241,7 +1253,7 @@ public final class HConstants {
public static final String HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY =
"hbase.canary.write.table.check.period";
-
+
/**
* Configuration keys for programmatic JAAS configuration for secured ZK interaction
*/
@@ -1250,7 +1262,7 @@ public final class HConstants {
"hbase.zookeeper.client.kerberos.principal";
public static final String ZK_SERVER_KEYTAB_FILE = "hbase.zookeeper.server.keytab.file";
public static final String ZK_SERVER_KERBEROS_PRINCIPAL =
- "hbase.zookeeper.server.kerberos.principal";
+ "hbase.zookeeper.server.kerberos.principal";
private HConstants() {
// Can't be instantiated with this ctor.
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
index 698a59a..9fb8415 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
@@ -22,9 +22,11 @@ public interface MetricsReplicationSinkSource {
public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp";
public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches";
public static final String SINK_APPLIED_OPS = "sink.appliedOps";
+ public static final String SINK_APPLIED_HFILES = "sink.appliedHFiles";
void setLastAppliedOpAge(long age);
void incrAppliedBatches(long batches);
void incrAppliedOps(long batchsize);
long getLastAppliedOpAge();
+ void incrAppliedHFiles(long hfileSize);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index fecf191..188c3a3 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -32,6 +32,9 @@ public interface MetricsReplicationSourceSource {
public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered";
+ public static final String SOURCE_SHIPPED_HFILES = "source.shippedHFiles";
+ public static final String SOURCE_SIZE_OF_HFILE_REFS_QUEUE = "source.sizeOfHFileRefsQueue";
+
void setLastShippedAge(long age);
void setSizeOfLogQueue(int size);
void incrSizeOfLogQueue(int size);
@@ -44,4 +47,7 @@ public interface MetricsReplicationSourceSource {
void incrLogReadInEdits(long size);
void clear();
long getLastShippedAge();
+ void incrHFilesShipped(long hfiles);
+ void incrSizeOfHFileRefsQueue(long size);
+ void decrSizeOfHFileRefsQueue(long size);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 6dace10..392cd39 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -32,6 +32,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
private final MutableCounterLong shippedOpsCounter;
private final MutableCounterLong shippedKBsCounter;
private final MutableCounterLong logReadInBytesCounter;
+ private final MutableCounterLong shippedHFilesCounter;
+ private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
this.rms = rms;
@@ -51,6 +53,11 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
logReadInEditsCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
logEditsFilteredCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
+
+ shippedHFilesCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_SHIPPED_HFILES, 0L);
+
+ sizeOfHFileRefsQueueGauge =
+ rms.getMetricsRegistry().getLongGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L);
}
@Override public void setLastShippedAge(long age) {
@@ -100,4 +107,18 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public long getLastShippedAge() {
return ageOfLastShippedOpGauge.value();
}
+
+ @Override public void incrHFilesShipped(long hfiles) {
+ shippedHFilesCounter.incr(hfiles);
+ }
+
+ @Override
+ public void incrSizeOfHFileRefsQueue(long size) {
+ sizeOfHFileRefsQueueGauge.incr(size);
+ }
+
+ @Override
+ public void decrSizeOfHFileRefsQueue(long size) {
+ sizeOfHFileRefsQueueGauge.decr(size);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
index 14212ba..8f4a337 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
@@ -26,11 +26,13 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
private final MutableGaugeLong ageGauge;
private final MutableCounterLong batchesCounter;
private final MutableCounterLong opsCounter;
+ private final MutableCounterLong hfilesCounter;
public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) {
ageGauge = rms.getMetricsRegistry().getLongGauge(SINK_AGE_OF_LAST_APPLIED_OP, 0L);
batchesCounter = rms.getMetricsRegistry().getLongCounter(SINK_APPLIED_BATCHES, 0L);
opsCounter = rms.getMetricsRegistry().getLongCounter(SINK_APPLIED_OPS, 0L);
+ hfilesCounter = rms.getMetricsRegistry().getLongCounter(SINK_APPLIED_HFILES, 0L);
}
@Override public void setLastAppliedOpAge(long age) {
@@ -49,4 +51,9 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
public long getLastAppliedOpAge() {
return ageGauge.value();
}
+
+ @Override
+ public void incrAppliedHFiles(long hfiles) {
+ hfilesCounter.incr(hfiles);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 1422e7e..217cc3e 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -32,6 +32,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String shippedOpsKey;
private final String shippedKBsKey;
private final String logReadInBytesKey;
+ private final String shippedHFilesKey;
+ private final String sizeOfHFileRefsQueueKey;
private final MutableGaugeLong ageOfLastShippedOpGauge;
private final MutableGaugeLong sizeOfLogQueueGauge;
@@ -41,6 +43,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final MutableCounterLong shippedOpsCounter;
private final MutableCounterLong shippedKBsCounter;
private final MutableCounterLong logReadInBytesCounter;
+ private final MutableCounterLong shippedHFilesCounter;
+ private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
this.rms = rms;
@@ -69,6 +73,12 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
logEditsFilteredKey = "source." + id + ".logEditsFiltered";
logEditsFilteredCounter = rms.getMetricsRegistry().getLongCounter(logEditsFilteredKey, 0L);
+
+ shippedHFilesKey = "source." + this.id + ".shippedHFiles";
+ shippedHFilesCounter = rms.getMetricsRegistry().getLongCounter(shippedHFilesKey, 0L);
+
+ sizeOfHFileRefsQueueKey = "source." + id + ".sizeOfHFileRefsQueue";
+ sizeOfHFileRefsQueueGauge = rms.getMetricsRegistry().getLongGauge(sizeOfHFileRefsQueueKey, 0L);
}
@Override public void setLastShippedAge(long age) {
@@ -124,10 +134,28 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
rms.removeMetric(logReadInEditsKey);
rms.removeMetric(logEditsFilteredKey);
+
+ rms.removeMetric(shippedHFilesKey);
+ rms.removeMetric(sizeOfHFileRefsQueueKey);
}
@Override
public long getLastShippedAge() {
return ageOfLastShippedOpGauge.value();
}
+
+ @Override
+ public void incrHFilesShipped(long hfiles) {
+ shippedHFilesCounter.incr(hfiles);
+ }
+
+ @Override
+ public void incrSizeOfHFileRefsQueue(long size) {
+ sizeOfHFileRefsQueueGauge.incr(size);
+ }
+
+ @Override
+ public void decrSizeOfHFileRefsQueue(long size) {
+ sizeOfHFileRefsQueueGauge.decr(size);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
index b4c378b..1c59ea6 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
@@ -16896,6 +16896,51 @@ public final class AdminProtos {
*/
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntryOrBuilder getEntryOrBuilder(
int index);
+
+ // optional string replicationClusterId = 2;
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ boolean hasReplicationClusterId();
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ java.lang.String getReplicationClusterId();
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ com.google.protobuf.ByteString
+ getReplicationClusterIdBytes();
+
+ // optional string sourceBaseNamespaceDirPath = 3;
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ boolean hasSourceBaseNamespaceDirPath();
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ java.lang.String getSourceBaseNamespaceDirPath();
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ com.google.protobuf.ByteString
+ getSourceBaseNamespaceDirPathBytes();
+
+ // optional string sourceHFileArchiveDirPath = 4;
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ boolean hasSourceHFileArchiveDirPath();
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ java.lang.String getSourceHFileArchiveDirPath();
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ com.google.protobuf.ByteString
+ getSourceHFileArchiveDirPathBytes();
}
/**
* Protobuf type {@code hbase.pb.ReplicateWALEntryRequest}
@@ -16963,6 +17008,21 @@ public final class AdminProtos {
entry_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.PARSER, extensionRegistry));
break;
}
+ case 18: {
+ bitField0_ |= 0x00000001;
+ replicationClusterId_ = input.readBytes();
+ break;
+ }
+ case 26: {
+ bitField0_ |= 0x00000002;
+ sourceBaseNamespaceDirPath_ = input.readBytes();
+ break;
+ }
+ case 34: {
+ bitField0_ |= 0x00000004;
+ sourceHFileArchiveDirPath_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -17005,6 +17065,7 @@ public final class AdminProtos {
return PARSER;
}
+ private int bitField0_;
// repeated .hbase.pb.WALEntry entry = 1;
public static final int ENTRY_FIELD_NUMBER = 1;
private java.util.List<org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry> entry_;
@@ -17041,8 +17102,140 @@ public final class AdminProtos {
return entry_.get(index);
}
+ // optional string replicationClusterId = 2;
+ public static final int REPLICATIONCLUSTERID_FIELD_NUMBER = 2;
+ private java.lang.Object replicationClusterId_;
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public boolean hasReplicationClusterId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public java.lang.String getReplicationClusterId() {
+ java.lang.Object ref = replicationClusterId_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ replicationClusterId_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public com.google.protobuf.ByteString
+ getReplicationClusterIdBytes() {
+ java.lang.Object ref = replicationClusterId_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ replicationClusterId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ // optional string sourceBaseNamespaceDirPath = 3;
+ public static final int SOURCEBASENAMESPACEDIRPATH_FIELD_NUMBER = 3;
+ private java.lang.Object sourceBaseNamespaceDirPath_;
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public boolean hasSourceBaseNamespaceDirPath() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public java.lang.String getSourceBaseNamespaceDirPath() {
+ java.lang.Object ref = sourceBaseNamespaceDirPath_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ sourceBaseNamespaceDirPath_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public com.google.protobuf.ByteString
+ getSourceBaseNamespaceDirPathBytes() {
+ java.lang.Object ref = sourceBaseNamespaceDirPath_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ sourceBaseNamespaceDirPath_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ // optional string sourceHFileArchiveDirPath = 4;
+ public static final int SOURCEHFILEARCHIVEDIRPATH_FIELD_NUMBER = 4;
+ private java.lang.Object sourceHFileArchiveDirPath_;
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public boolean hasSourceHFileArchiveDirPath() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public java.lang.String getSourceHFileArchiveDirPath() {
+ java.lang.Object ref = sourceHFileArchiveDirPath_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ sourceHFileArchiveDirPath_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public com.google.protobuf.ByteString
+ getSourceHFileArchiveDirPathBytes() {
+ java.lang.Object ref = sourceHFileArchiveDirPath_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ sourceHFileArchiveDirPath_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
entry_ = java.util.Collections.emptyList();
+ replicationClusterId_ = "";
+ sourceBaseNamespaceDirPath_ = "";
+ sourceHFileArchiveDirPath_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -17065,6 +17258,15 @@ public final class AdminProtos {
for (int i = 0; i < entry_.size(); i++) {
output.writeMessage(1, entry_.get(i));
}
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeBytes(2, getReplicationClusterIdBytes());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBytes(3, getSourceBaseNamespaceDirPathBytes());
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeBytes(4, getSourceHFileArchiveDirPathBytes());
+ }
getUnknownFields().writeTo(output);
}
@@ -17078,6 +17280,18 @@ public final class AdminProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(1, entry_.get(i));
}
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(2, getReplicationClusterIdBytes());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(3, getSourceBaseNamespaceDirPathBytes());
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(4, getSourceHFileArchiveDirPathBytes());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -17103,6 +17317,21 @@ public final class AdminProtos {
boolean result = true;
result = result && getEntryList()
.equals(other.getEntryList());
+ result = result && (hasReplicationClusterId() == other.hasReplicationClusterId());
+ if (hasReplicationClusterId()) {
+ result = result && getReplicationClusterId()
+ .equals(other.getReplicationClusterId());
+ }
+ result = result && (hasSourceBaseNamespaceDirPath() == other.hasSourceBaseNamespaceDirPath());
+ if (hasSourceBaseNamespaceDirPath()) {
+ result = result && getSourceBaseNamespaceDirPath()
+ .equals(other.getSourceBaseNamespaceDirPath());
+ }
+ result = result && (hasSourceHFileArchiveDirPath() == other.hasSourceHFileArchiveDirPath());
+ if (hasSourceHFileArchiveDirPath()) {
+ result = result && getSourceHFileArchiveDirPath()
+ .equals(other.getSourceHFileArchiveDirPath());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -17120,6 +17349,18 @@ public final class AdminProtos {
hash = (37 * hash) + ENTRY_FIELD_NUMBER;
hash = (53 * hash) + getEntryList().hashCode();
}
+ if (hasReplicationClusterId()) {
+ hash = (37 * hash) + REPLICATIONCLUSTERID_FIELD_NUMBER;
+ hash = (53 * hash) + getReplicationClusterId().hashCode();
+ }
+ if (hasSourceBaseNamespaceDirPath()) {
+ hash = (37 * hash) + SOURCEBASENAMESPACEDIRPATH_FIELD_NUMBER;
+ hash = (53 * hash) + getSourceBaseNamespaceDirPath().hashCode();
+ }
+ if (hasSourceHFileArchiveDirPath()) {
+ hash = (37 * hash) + SOURCEHFILEARCHIVEDIRPATH_FIELD_NUMBER;
+ hash = (53 * hash) + getSourceHFileArchiveDirPath().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -17243,6 +17484,12 @@ public final class AdminProtos {
} else {
entryBuilder_.clear();
}
+ replicationClusterId_ = "";
+ bitField0_ = (bitField0_ & ~0x00000002);
+ sourceBaseNamespaceDirPath_ = "";
+ bitField0_ = (bitField0_ & ~0x00000004);
+ sourceHFileArchiveDirPath_ = "";
+ bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
@@ -17270,6 +17517,7 @@ public final class AdminProtos {
public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest buildPartial() {
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest result = new org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest(this);
int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
if (entryBuilder_ == null) {
if (((bitField0_ & 0x00000001) == 0x00000001)) {
entry_ = java.util.Collections.unmodifiableList(entry_);
@@ -17279,6 +17527,19 @@ public final class AdminProtos {
} else {
result.entry_ = entryBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.replicationClusterId_ = replicationClusterId_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.sourceBaseNamespaceDirPath_ = sourceBaseNamespaceDirPath_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.sourceHFileArchiveDirPath_ = sourceHFileArchiveDirPath_;
+ result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
@@ -17320,6 +17581,21 @@ public final class AdminProtos {
}
}
}
+ if (other.hasReplicationClusterId()) {
+ bitField0_ |= 0x00000002;
+ replicationClusterId_ = other.replicationClusterId_;
+ onChanged();
+ }
+ if (other.hasSourceBaseNamespaceDirPath()) {
+ bitField0_ |= 0x00000004;
+ sourceBaseNamespaceDirPath_ = other.sourceBaseNamespaceDirPath_;
+ onChanged();
+ }
+ if (other.hasSourceHFileArchiveDirPath()) {
+ bitField0_ |= 0x00000008;
+ sourceHFileArchiveDirPath_ = other.sourceHFileArchiveDirPath_;
+ onChanged();
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -17593,6 +17869,228 @@ public final class AdminProtos {
return entryBuilder_;
}
+ // optional string replicationClusterId = 2;
+ private java.lang.Object replicationClusterId_ = "";
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public boolean hasReplicationClusterId() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public java.lang.String getReplicationClusterId() {
+ java.lang.Object ref = replicationClusterId_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ replicationClusterId_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public com.google.protobuf.ByteString
+ getReplicationClusterIdBytes() {
+ java.lang.Object ref = replicationClusterId_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ replicationClusterId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public Builder setReplicationClusterId(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ replicationClusterId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public Builder clearReplicationClusterId() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ replicationClusterId_ = getDefaultInstance().getReplicationClusterId();
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public Builder setReplicationClusterIdBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ replicationClusterId_ = value;
+ onChanged();
+ return this;
+ }
+
+ // optional string sourceBaseNamespaceDirPath = 3;
+ private java.lang.Object sourceBaseNamespaceDirPath_ = "";
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public boolean hasSourceBaseNamespaceDirPath() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public java.lang.String getSourceBaseNamespaceDirPath() {
+ java.lang.Object ref = sourceBaseNamespaceDirPath_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ sourceBaseNamespaceDirPath_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public com.google.protobuf.ByteString
+ getSourceBaseNamespaceDirPathBytes() {
+ java.lang.Object ref = sourceBaseNamespaceDirPath_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ sourceBaseNamespaceDirPath_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public Builder setSourceBaseNamespaceDirPath(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ sourceBaseNamespaceDirPath_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public Builder clearSourceBaseNamespaceDirPath() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ sourceBaseNamespaceDirPath_ = getDefaultInstance().getSourceBaseNamespaceDirPath();
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public Builder setSourceBaseNamespaceDirPathBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ sourceBaseNamespaceDirPath_ = value;
+ onChanged();
+ return this;
+ }
+
+ // optional string sourceHFileArchiveDirPath = 4;
+ private java.lang.Object sourceHFileArchiveDirPath_ = "";
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public boolean hasSourceHFileArchiveDirPath() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public java.lang.String getSourceHFileArchiveDirPath() {
+ java.lang.Object ref = sourceHFileArchiveDirPath_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ sourceHFileArchiveDirPath_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public com.google.protobuf.ByteString
+ getSourceHFileArchiveDirPathBytes() {
+ java.lang.Object ref = sourceHFileArchiveDirPath_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ sourceHFileArchiveDirPath_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public Builder setSourceHFileArchiveDirPath(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000008;
+ sourceHFileArchiveDirPath_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public Builder clearSourceHFileArchiveDirPath() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ sourceHFileArchiveDirPath_ = getDefaultInstance().getSourceHFileArchiveDirPath();
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public Builder setSourceHFileArchiveDirPathBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000008;
+ sourceHFileArchiveDirPath_ = value;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:hbase.pb.ReplicateWALEntryRequest)
}
@@ -23539,56 +24037,58 @@ public final class AdminProtos {
"ster_system_time\030\004 \001(\004\"\026\n\024MergeRegionsRe" +
"sponse\"a\n\010WALEntry\022\035\n\003key\030\001 \002(\0132\020.hbase." +
"pb.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025as",
- "sociated_cell_count\030\003 \001(\005\"=\n\030ReplicateWA" +
- "LEntryRequest\022!\n\005entry\030\001 \003(\0132\022.hbase.pb." +
- "WALEntry\"\033\n\031ReplicateWALEntryResponse\"\026\n" +
- "\024RollWALWriterRequest\"0\n\025RollWALWriterRe" +
- "sponse\022\027\n\017region_to_flush\030\001 \003(\014\"#\n\021StopS" +
- "erverRequest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServ" +
- "erResponse\"\026\n\024GetServerInfoRequest\"K\n\nSe" +
- "rverInfo\022)\n\013server_name\030\001 \002(\0132\024.hbase.pb" +
- ".ServerName\022\022\n\nwebui_port\030\002 \001(\r\"B\n\025GetSe" +
- "rverInfoResponse\022)\n\013server_info\030\001 \002(\0132\024.",
- "hbase.pb.ServerInfo\"\034\n\032UpdateConfigurati" +
- "onRequest\"\035\n\033UpdateConfigurationResponse" +
- "2\207\013\n\014AdminService\022P\n\rGetRegionInfo\022\036.hba" +
- "se.pb.GetRegionInfoRequest\032\037.hbase.pb.Ge" +
- "tRegionInfoResponse\022M\n\014GetStoreFile\022\035.hb" +
- "ase.pb.GetStoreFileRequest\032\036.hbase.pb.Ge" +
- "tStoreFileResponse\022V\n\017GetOnlineRegion\022 ." +
- "hbase.pb.GetOnlineRegionRequest\032!.hbase." +
- "pb.GetOnlineRegionResponse\022G\n\nOpenRegion" +
- "\022\033.hbase.pb.OpenRegionRequest\032\034.hbase.pb",
- ".OpenRegionResponse\022M\n\014WarmupRegion\022\035.hb" +
- "ase.pb.WarmupRegionRequest\032\036.hbase.pb.Wa" +
- "rmupRegionResponse\022J\n\013CloseRegion\022\034.hbas" +
- "e.pb.CloseRegionRequest\032\035.hbase.pb.Close" +
- "RegionResponse\022J\n\013FlushRegion\022\034.hbase.pb" +
- ".FlushRegionRequest\032\035.hbase.pb.FlushRegi" +
- "onResponse\022J\n\013SplitRegion\022\034.hbase.pb.Spl" +
- "itRegionRequest\032\035.hbase.pb.SplitRegionRe" +
- "sponse\022P\n\rCompactRegion\022\036.hbase.pb.Compa" +
- "ctRegionRequest\032\037.hbase.pb.CompactRegion",
- "Response\022M\n\014MergeRegions\022\035.hbase.pb.Merg" +
- "eRegionsRequest\032\036.hbase.pb.MergeRegionsR" +
- "esponse\022\\\n\021ReplicateWALEntry\022\".hbase.pb." +
- "ReplicateWALEntryRequest\032#.hbase.pb.Repl" +
- "icateWALEntryResponse\022Q\n\006Replay\022\".hbase." +
- "pb.ReplicateWALEntryRequest\032#.hbase.pb.R" +
- "eplicateWALEntryResponse\022P\n\rRollWALWrite" +
- "r\022\036.hbase.pb.RollWALWriterRequest\032\037.hbas" +
- "e.pb.RollWALWriterResponse\022P\n\rGetServerI" +
- "nfo\022\036.hbase.pb.GetServerInfoRequest\032\037.hb",
- "ase.pb.GetServerInfoResponse\022G\n\nStopServ" +
- "er\022\033.hbase.pb.StopServerRequest\032\034.hbase." +
- "pb.StopServerResponse\022_\n\022UpdateFavoredNo" +
- "des\022#.hbase.pb.UpdateFavoredNodesRequest" +
- "\032$.hbase.pb.UpdateFavoredNodesResponse\022b" +
- "\n\023UpdateConfiguration\022$.hbase.pb.UpdateC" +
- "onfigurationRequest\032%.hbase.pb.UpdateCon" +
- "figurationResponseBA\n*org.apache.hadoop." +
- "hbase.protobuf.generatedB\013AdminProtosH\001\210" +
- "\001\001\240\001\001"
+ "sociated_cell_count\030\003 \001(\005\"\242\001\n\030ReplicateW" +
+ "ALEntryRequest\022!\n\005entry\030\001 \003(\0132\022.hbase.pb" +
+ ".WALEntry\022\034\n\024replicationClusterId\030\002 \001(\t\022" +
+ "\"\n\032sourceBaseNamespaceDirPath\030\003 \001(\t\022!\n\031s" +
+ "ourceHFileArchiveDirPath\030\004 \001(\t\"\033\n\031Replic" +
+ "ateWALEntryResponse\"\026\n\024RollWALWriterRequ" +
+ "est\"0\n\025RollWALWriterResponse\022\027\n\017region_t" +
+ "o_flush\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006re" +
+ "ason\030\001 \002(\t\"\024\n\022StopServerResponse\"\026\n\024GetS" +
+ "erverInfoRequest\"K\n\nServerInfo\022)\n\013server",
+ "_name\030\001 \002(\0132\024.hbase.pb.ServerName\022\022\n\nweb" +
+ "ui_port\030\002 \001(\r\"B\n\025GetServerInfoResponse\022)" +
+ "\n\013server_info\030\001 \002(\0132\024.hbase.pb.ServerInf" +
+ "o\"\034\n\032UpdateConfigurationRequest\"\035\n\033Updat" +
+ "eConfigurationResponse2\207\013\n\014AdminService\022" +
+ "P\n\rGetRegionInfo\022\036.hbase.pb.GetRegionInf" +
+ "oRequest\032\037.hbase.pb.GetRegionInfoRespons" +
+ "e\022M\n\014GetStoreFile\022\035.hbase.pb.GetStoreFil" +
+ "eRequest\032\036.hbase.pb.GetStoreFileResponse" +
+ "\022V\n\017GetOnlineRegion\022 .hbase.pb.GetOnline",
+ "RegionRequest\032!.hbase.pb.GetOnlineRegion" +
+ "Response\022G\n\nOpenRegion\022\033.hbase.pb.OpenRe" +
+ "gionRequest\032\034.hbase.pb.OpenRegionRespons" +
+ "e\022M\n\014WarmupRegion\022\035.hbase.pb.WarmupRegio" +
+ "nRequest\032\036.hbase.pb.WarmupRegionResponse" +
+ "\022J\n\013CloseRegion\022\034.hbase.pb.CloseRegionRe" +
+ "quest\032\035.hbase.pb.CloseRegionResponse\022J\n\013" +
+ "FlushRegion\022\034.hbase.pb.FlushRegionReques" +
+ "t\032\035.hbase.pb.FlushRegionResponse\022J\n\013Spli" +
+ "tRegion\022\034.hbase.pb.SplitRegionRequest\032\035.",
+ "hbase.pb.SplitRegionResponse\022P\n\rCompactR" +
+ "egion\022\036.hbase.pb.CompactRegionRequest\032\037." +
+ "hbase.pb.CompactRegionResponse\022M\n\014MergeR" +
+ "egions\022\035.hbase.pb.MergeRegionsRequest\032\036." +
+ "hbase.pb.MergeRegionsResponse\022\\\n\021Replica" +
+ "teWALEntry\022\".hbase.pb.ReplicateWALEntryR" +
+ "equest\032#.hbase.pb.ReplicateWALEntryRespo" +
+ "nse\022Q\n\006Replay\022\".hbase.pb.ReplicateWALEnt" +
+ "ryRequest\032#.hbase.pb.ReplicateWALEntryRe" +
+ "sponse\022P\n\rRollWALWriter\022\036.hbase.pb.RollW",
+ "ALWriterRequest\032\037.hbase.pb.RollWALWriter" +
+ "Response\022P\n\rGetServerInfo\022\036.hbase.pb.Get" +
+ "ServerInfoRequest\032\037.hbase.pb.GetServerIn" +
+ "foResponse\022G\n\nStopServer\022\033.hbase.pb.Stop" +
+ "ServerRequest\032\034.hbase.pb.StopServerRespo" +
+ "nse\022_\n\022UpdateFavoredNodes\022#.hbase.pb.Upd" +
+ "ateFavoredNodesRequest\032$.hbase.pb.Update" +
+ "FavoredNodesResponse\022b\n\023UpdateConfigurat" +
+ "ion\022$.hbase.pb.UpdateConfigurationReques" +
+ "t\032%.hbase.pb.UpdateConfigurationResponse",
+ "BA\n*org.apache.hadoop.hbase.protobuf.gen" +
+ "eratedB\013AdminProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -23750,7 +24250,7 @@ public final class AdminProtos {
internal_static_hbase_pb_ReplicateWALEntryRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_ReplicateWALEntryRequest_descriptor,
- new java.lang.String[] { "Entry", });
+ new java.lang.String[] { "Entry", "ReplicationClusterId", "SourceBaseNamespaceDirPath", "SourceHFileArchiveDirPath", });
internal_static_hbase_pb_ReplicateWALEntryResponse_descriptor =
getDescriptor().getMessageTypes().get(24);
internal_static_hbase_pb_ReplicateWALEntryResponse_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-protocol/src/main/protobuf/Admin.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto
index f7787f5..a1905a4 100644
--- a/hbase-protocol/src/main/protobuf/Admin.proto
+++ b/hbase-protocol/src/main/protobuf/Admin.proto
@@ -211,6 +211,9 @@ message WALEntry {
*/
message ReplicateWALEntryRequest {
repeated WALEntry entry = 1;
+ optional string replicationClusterId = 2;
+ optional string sourceBaseNamespaceDirPath = 3;
+ optional string sourceHFileArchiveDirPath = 4;
}
message ReplicateWALEntryResponse {
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 44be2d3..369ae90 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSHDFSUtils;
@@ -125,6 +126,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private FsDelegationToken fsDelegationToken;
private String bulkToken;
private UserProvider userProvider;
+ private int nrThreads;
private LoadIncrementalHFiles() {}
@@ -146,6 +148,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
+ nrThreads = conf.getInt("hbase.loadincremental.threads.max",
+ Runtime.getRuntime().availableProcessors());
initalized = true;
}
@@ -246,7 +250,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* region boundary, and each part is added back into the queue.
* The import process finishes when the queue is empty.
*/
- static class LoadQueueItem {
+ public static class LoadQueueItem {
final byte[] family;
final Path hfilePath;
@@ -313,7 +317,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @param table the table to load into
* @throws TableNotFoundException if table does not yet exist
*/
- @SuppressWarnings("deprecation")
public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
RegionLocator regionLocator) throws TableNotFoundException, IOException {
@@ -321,16 +324,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throw new TableNotFoundException("Table " + table.getName() + "is not currently available.");
}
- // initialize thread pools
- int nrThreads = getConf().getInt("hbase.loadincremental.threads.max",
- Runtime.getRuntime().availableProcessors());
- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
- builder.setNameFormat("LoadIncrementalHFiles-%1$d");
- ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
- 60, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- builder.build());
- ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
+ ExecutorService pool = createExecutorService();
// LQI queue does not need to be threadsafe -- all operations on this queue
// happen in this thread
@@ -347,30 +341,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
"option, consider removing the files and bulkload again without this option. " +
"See HBASE-13985");
}
- discoverLoadQueue(queue, hfofDir, validateHFile);
- // check whether there is invalid family name in HFiles to be bulkloaded
- Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
- ArrayList<String> familyNames = new ArrayList<String>(families.size());
- for (HColumnDescriptor family : families) {
- familyNames.add(family.getNameAsString());
- }
- ArrayList<String> unmatchedFamilies = new ArrayList<String>();
- Iterator<LoadQueueItem> queueIter = queue.iterator();
- while (queueIter.hasNext()) {
- LoadQueueItem lqi = queueIter.next();
- String familyNameInHFile = Bytes.toString(lqi.family);
- if (!familyNames.contains(familyNameInHFile)) {
- unmatchedFamilies.add(familyNameInHFile);
- }
- }
- if (unmatchedFamilies.size() > 0) {
- String msg =
- "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
- + unmatchedFamilies + "; valid family names of table "
- + table.getName() + " are: " + familyNames;
- LOG.error(msg);
- throw new IOException(msg);
- }
+ prepareHFileQueue(hfofDir, table, queue, validateHFile);
+
int count = 0;
if (queue.isEmpty()) {
@@ -397,7 +369,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
+ count + " with " + queue.size() + " files remaining to group or split");
}
- int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 10);
+ int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
if (maxRetries != 0 && count >= maxRetries) {
throw new IOException("Retry attempted " + count +
@@ -447,6 +419,85 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
/**
+ * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
+ * passed directory and validates whether the prepared queue has all the valid table column
+ * families in it.
+ * @param hfilesDir directory containing list of hfiles to be loaded into the table
+ * @param table table to which hfiles should be loaded
+ * @param queue queue which needs to be loaded into the table
+ * @throws IOException If any I/O or network error occurred
+ */
+ public void prepareHFileQueue(Path hfofDir, Table table, Deque<LoadQueueItem> queue,
+ boolean validateHFile) throws IOException {
+ discoverLoadQueue(queue, hfofDir, validateHFile);
+ validateFamiliesInHFiles(table, queue);
+ }
+
+ // Initialize a thread pool
+ private ExecutorService createExecutorService() {
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ builder.setNameFormat("LoadIncrementalHFiles-%1$d");
+ ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), builder.build());
+ ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
+ return pool;
+ }
+
+ /**
+ * Checks whether there is any invalid family name in HFiles to be bulk loaded.
+ */
+ private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue)
+ throws IOException {
+ Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
+ List<String> familyNames = new ArrayList<String>(families.size());
+ for (HColumnDescriptor family : families) {
+ familyNames.add(family.getNameAsString());
+ }
+ List<String> unmatchedFamilies = new ArrayList<String>();
+ Iterator<LoadQueueItem> queueIter = queue.iterator();
+ while (queueIter.hasNext()) {
+ LoadQueueItem lqi = queueIter.next();
+ String familyNameInHFile = Bytes.toString(lqi.family);
+ if (!familyNames.contains(familyNameInHFile)) {
+ unmatchedFamilies.add(familyNameInHFile);
+ }
+ }
+ if (unmatchedFamilies.size() > 0) {
+ String msg =
+ "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
+ + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: "
+ + familyNames;
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ }
+
+ /**
+ * Used by the replication sink to load the hfiles from the source cluster. It does the following,
+ * 1. {@link LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)} 2.
+ * {@link
+ * LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)}
+ * @param table Table to which these hfiles should be loaded to
+ * @param conn Connection to use
+ * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
+ * @param startEndKeys starting and ending row keys of the region
+ */
+ public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
+ Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+ ExecutorService pool = null;
+ try {
+ pool = createExecutorService();
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups =
+ groupOrSplitPhase(table, pool, queue, startEndKeys);
+ bulkLoadPhase(table, conn, pool, queue, regionGroups);
+ } finally {
+ if (pool != null) {
+ pool.shutdown();
+ }
+ }
+ }
+
+ /**
* This takes the LQI's grouped by likely regions and attempts to bulk load
* them. Any failures are re-queued for another pass with the
* groupOrSplitPhase.
@@ -592,10 +643,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
String uniqueName = getUniqueName();
HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
+
Path botOut = new Path(tmpDir, uniqueName + ".bottom");
Path topOut = new Path(tmpDir, uniqueName + ".top");
- splitStoreFile(getConf(), hfilePath, familyDesc, splitKey,
- botOut, topOut);
+ splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
FileSystem fs = tmpDir.getFileSystem(getConf());
fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
@@ -626,6 +677,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
final Pair<byte[][], byte[][]> startEndKeys)
throws IOException {
final Path hfilePath = item.hfilePath;
+ // fs is the source filesystem
+ if (fs == null) {
+ fs = hfilePath.getFileSystem(getConf());
+ }
HFile.Reader hfr = HFile.createReader(fs, hfilePath,
new CacheConfig(getConf()), getConf());
final byte[] first, last;
@@ -712,7 +767,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* failure
*/
protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
- final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
+ final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
throws IOException {
final List<Pair<byte[], String>> famPaths =
new ArrayList<Pair<byte[], String>>(lqis.size());
@@ -747,6 +802,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
//in user directory
if(secureClient != null && !success) {
FileSystem targetFs = FileSystem.get(getConf());
+ // fs is the source filesystem
+ if(fs == null) {
+ fs = lqis.iterator().next().hfilePath.getFileSystem(getConf());
+ }
// Check to see if the source and target filesystems are the same
// If they are the same filesystem, we will try move the files back
// because previously we moved them to the staging directory.
@@ -1000,4 +1059,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
System.exit(ret);
}
+ /**
+ * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
+ * used only when {@link SecureBulkLoadEndpoint} is configured in hbase.coprocessor.region.classes
+ * property. This directory is used as a temporary directory where all files are initially
+ * copied/moved from user given directory, set all the required file permissions and then from
+ * their it is finally loaded into a table. This should be set only when, one would like to manage
+ * the staging directory by itself. Otherwise this tool will handle this by itself.
+ * @param stagingDir staging directory path
+ */
+ public void setBulkToken(String stagingDir) {
+ this.bulkToken = stagingDir;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index d6a120b..91185af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -28,22 +28,23 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALKey;
import com.google.protobuf.ServiceException;
@@ -51,15 +52,20 @@ import com.google.protobuf.ServiceException;
public class ReplicationProtbufUtil {
/**
* A helper to replicate a list of WAL entries using admin protocol.
- *
- * @param admin
- * @param entries
+ * @param admin Admin service
+ * @param entries Array of WAL entries to be replicated
+ * @param replicationClusterId Id which will uniquely identify source cluster FS client
+ * configurations in the replication configuration directory
+ * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
+ * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
* @throws java.io.IOException
*/
public static void replicateWALEntry(final AdminService.BlockingInterface admin,
- final Entry[] entries) throws IOException {
+ final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir,
+ Path sourceHFileArchiveDir) throws IOException {
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
- buildReplicateWALEntryRequest(entries, null);
+ buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
+ sourceHFileArchiveDir);
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
try {
admin.replicateWALEntry(controller, p.getFirst());
@@ -77,19 +83,22 @@ public class ReplicationProtbufUtil {
*/
public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
buildReplicateWALEntryRequest(final Entry[] entries) {
- return buildReplicateWALEntryRequest(entries, null);
+ return buildReplicateWALEntryRequest(entries, null, null, null, null);
}
/**
* Create a new ReplicateWALEntryRequest from a list of WAL entries
- *
* @param entries the WAL entries to be replicated
* @param encodedRegionName alternative region name to use if not null
- * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
- * found.
+ * @param replicationClusterId Id which will uniquely identify source cluster FS client
+ * configurations in the replication configuration directory
+ * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
+ * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
+ * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
*/
public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
- buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName) {
+ buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName,
+ String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) {
// Accumulate all the Cells seen in here.
List<List<? extends Cell>> allCells = new ArrayList<List<? extends Cell>>(entries.length);
int size = 0;
@@ -146,6 +155,17 @@ public class ReplicationProtbufUtil {
entryBuilder.setAssociatedCellCount(cells.size());
builder.addEntry(entryBuilder.build());
}
+
+ if (replicationClusterId != null) {
+ builder.setReplicationClusterId(replicationClusterId);
+ }
+ if (sourceBaseNamespaceDir != null) {
+ builder.setSourceBaseNamespaceDirPath(sourceBaseNamespaceDir.toString());
+ }
+ if (sourceHFileArchiveDir != null) {
+ builder.setSourceHFileArchiveDirPath(sourceHFileArchiveDir.toString());
+ }
+
return new Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>(builder.build(),
getCellScanner(allCells, size));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index d94e11c..0c9b0e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1800,7 +1800,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
List<WALEntry> entries = request.getEntryList();
CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
- regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
+ regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner,
+ request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
+ request.getSourceHFileArchiveDirPath());
regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
return ReplicateWALEntryResponse.newBuilder().build();
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
index 5f96bf7..836d3aa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
@@ -36,7 +36,13 @@ public interface ReplicationSinkService extends ReplicationService {
* Carry on the list of log entries down to the sink
* @param entries list of WALEntries to replicate
* @param cells Cells that the WALEntries refer to (if cells is non-null)
+ * @param replicationClusterId Id which will uniquely identify source cluster FS client
+ * configurations in the replication configuration directory
+ * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
+ * directory required for replicating hfiles
+ * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
* @throws IOException
*/
- void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException;
+ void replicateLogEntries(List<WALEntry> entries, CellScanner cells, String replicationClusterId,
+ String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath) throws IOException;
}
[2/3] hbase git commit: HBASE-13153 Bulk Loaded HFile Replication
(Ashish Singhi)
Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
index 457d859..db98083 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
@@ -85,17 +85,16 @@ public interface WALActionsListener {
);
/**
- *
* @param htd
* @param logKey
- * @param logEdit
- * TODO: Retire this in favor of {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)}
- * It only exists to get scope when replicating. Scope should be in the WALKey and not need
- * us passing in a <code>htd</code>.
+ * @param logEdit TODO: Retire this in favor of
+ * {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)} It only exists to get
+ * scope when replicating. Scope should be in the WALKey and not need us passing in a
+ * <code>htd</code>.
+ * @throws IOException If failed to parse the WALEdit
*/
- void visitLogEntryBeforeWrite(
- HTableDescriptor htd, WALKey logKey, WALEdit logEdit
- );
+ void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
+ throws IOException;
/**
* For notification post append to the writer. Used by metrics system at least.
@@ -136,7 +135,9 @@ public interface WALActionsListener {
public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey, WALEdit logEdit) {}
@Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {}
+ public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
+ throws IOException {
+ }
@Override
public void postAppend(final long entryLen, final long elapsedTimeMillis) {}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
index 3501f3e..f97ec15 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -18,13 +18,21 @@
package org.apache.hadoop.hbase.replication;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
import java.util.NavigableMap;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
@@ -32,6 +40,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
*/
@InterfaceAudience.Private
public class ScopeWALEntryFilter implements WALEntryFilter {
+ private static final Log LOG = LogFactory.getLog(ScopeWALEntryFilter.class);
@Override
public Entry filter(Entry entry) {
@@ -41,13 +50,27 @@ public class ScopeWALEntryFilter implements WALEntryFilter {
}
ArrayList<Cell> cells = entry.getEdit().getCells();
int size = cells.size();
+ byte[] fam;
for (int i = size - 1; i >= 0; i--) {
Cell cell = cells.get(i);
- // The scope will be null or empty if
- // there's nothing to replicate in that WALEdit
- byte[] fam = CellUtil.cloneFamily(cell);
- if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
- cells.remove(i);
+ // If a bulk load entry has a scope then that means user has enabled replication for bulk load
+ // hfiles.
+ // TODO There is a similar logic in TableCfWALEntryFilter but data structures are different so
+ // cannot refactor into one now, can revisit and see if any way to unify them.
+ if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+ Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(scopes, cell);
+ if (filteredBulkLoadEntryCell != null) {
+ cells.set(i, filteredBulkLoadEntryCell);
+ } else {
+ cells.remove(i);
+ }
+ } else {
+ // The scope will be null or empty if
+ // there's nothing to replicate in that WALEdit
+ fam = CellUtil.cloneFamily(cell);
+ if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
+ cells.remove(i);
+ }
}
}
if (cells.size() < size / 2) {
@@ -56,4 +79,41 @@ public class ScopeWALEntryFilter implements WALEntryFilter {
return entry;
}
+ private Cell filterBulkLoadEntries(NavigableMap<byte[], Integer> scopes, Cell cell) {
+ byte[] fam;
+ BulkLoadDescriptor bld = null;
+ try {
+ bld = WALEdit.getBulkLoadDescriptor(cell);
+ } catch (IOException e) {
+ LOG.warn("Failed to get bulk load events information from the WAL file.", e);
+ return cell;
+ }
+ List<StoreDescriptor> storesList = bld.getStoresList();
+ // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
+ List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
+ Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
+ boolean anyStoreRemoved = false;
+ while (copiedStoresListIterator.hasNext()) {
+ StoreDescriptor sd = copiedStoresListIterator.next();
+ fam = sd.getFamilyName().toByteArray();
+ if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
+ copiedStoresListIterator.remove();
+ anyStoreRemoved = true;
+ }
+ }
+
+ if (!anyStoreRemoved) {
+ return cell;
+ } else if (copiedStoresList.isEmpty()) {
+ return null;
+ }
+ BulkLoadDescriptor.Builder newDesc =
+ BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
+ .setEncodedRegionName(bld.getEncodedRegionName())
+ .setBulkloadSeqNum(bld.getBulkloadSeqNum());
+ newDesc.addAllStores(copiedStoresList);
+ BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
+ return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
+ cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
index 642ee8a..f10849b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
@@ -18,14 +18,20 @@
package org.apache.hadoop.hbase.replication;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -52,19 +58,36 @@ public class TableCfWALEntryFilter implements WALEntryFilter {
}
int size = cells.size();
+ // If null means user has explicitly not configured any table CFs so all the tables data are
+ // applicable for replication
+ if (tableCFs == null) {
+ return entry;
+ }
// return null(prevent replicating) if logKey's table isn't in this peer's
- // replicable table list (empty tableCFs means all table are replicable)
- if (tableCFs != null && !tableCFs.containsKey(tabName)) {
+ // replicable table list
+ if (!tableCFs.containsKey(tabName)) {
return null;
} else {
- List<String> cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
+ List<String> cfs = tableCFs.get(tabName);
for (int i = size - 1; i >= 0; i--) {
Cell cell = cells.get(i);
- // ignore(remove) kv if its cf isn't in the replicable cf list
- // (empty cfs means all cfs of this table are replicable)
- if ((cfs != null) && !cfs.contains(Bytes.toString(
- cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))) {
- cells.remove(i);
+ // TODO There is a similar logic in ScopeWALEntryFilter but data structures are different so
+ // cannot refactor into one now, can revisit and see if any way to unify them.
+ // Filter bulk load entries separately
+ if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+ Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(cfs, cell);
+ if (filteredBulkLoadEntryCell != null) {
+ cells.set(i, filteredBulkLoadEntryCell);
+ } else {
+ cells.remove(i);
+ }
+ } else {
+ // ignore(remove) kv if its cf isn't in the replicable cf list
+ // (empty cfs means all cfs of this table are replicable)
+ if ((cfs != null) && !cfs.contains(Bytes.toString(cell.getFamilyArray(),
+ cell.getFamilyOffset(), cell.getFamilyLength()))) {
+ cells.remove(i);
+ }
}
}
}
@@ -74,4 +97,41 @@ public class TableCfWALEntryFilter implements WALEntryFilter {
return entry;
}
+ private Cell filterBulkLoadEntries(List<String> cfs, Cell cell) {
+ byte[] fam;
+ BulkLoadDescriptor bld = null;
+ try {
+ bld = WALEdit.getBulkLoadDescriptor(cell);
+ } catch (IOException e) {
+ LOG.warn("Failed to get bulk load events information from the WAL file.", e);
+ return cell;
+ }
+ List<StoreDescriptor> storesList = bld.getStoresList();
+ // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
+ List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
+ Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
+ boolean anyStoreRemoved = false;
+ while (copiedStoresListIterator.hasNext()) {
+ StoreDescriptor sd = copiedStoresListIterator.next();
+ fam = sd.getFamilyName().toByteArray();
+ if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
+ copiedStoresListIterator.remove();
+ anyStoreRemoved = true;
+ }
+ }
+
+ if (!anyStoreRemoved) {
+ return cell;
+ } else if (copiedStoresList.isEmpty()) {
+ return null;
+ }
+ BulkLoadDescriptor.Builder newDesc =
+ BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
+ .setEncodedRegionName(bld.getEncodedRegionName())
+ .setBulkloadSeqNum(bld.getBulkloadSeqNum());
+ newDesc.addAllStores(copiedStoresList);
+ BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
+ return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
+ cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java
new file mode 100644
index 0000000..9bfea4b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
+import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Implementation of a file cleaner that checks if a hfile is still scheduled for replication before
+ * deleting it from hfile archive directory.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate implements Abortable {
+ private static final Log LOG = LogFactory.getLog(ReplicationHFileCleaner.class);
+ private ZooKeeperWatcher zkw;
+ private ReplicationQueuesClient rqc;
+ private boolean stopped = false;
+ private boolean aborted;
+
+ @Override
+ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
+ // all members of this class are null if replication is disabled,
+ // so we cannot filter the files
+ if (this.getConf() == null) {
+ return files;
+ }
+
+ final Set<String> hfileRefs;
+ try {
+ // The concurrently created new hfile entries in ZK may not be included in the return list,
+ // but they won't be deleted because they're not in the checking set.
+ hfileRefs = loadHFileRefsFromPeers();
+ } catch (KeeperException e) {
+ LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files");
+ return Collections.emptyList();
+ }
+ return Iterables.filter(files, new Predicate<FileStatus>() {
+ @Override
+ public boolean apply(FileStatus file) {
+ String hfile = file.getPath().getName();
+ boolean foundHFileRefInQueue = hfileRefs.contains(hfile);
+ if (LOG.isDebugEnabled()) {
+ if (foundHFileRefInQueue) {
+ LOG.debug("Found hfile reference in ZK, keeping: " + hfile);
+ } else {
+ LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile);
+ }
+ }
+ return !foundHFileRefInQueue;
+ }
+ });
+ }
+
+ /**
+ * Load all hfile references in all replication queues from ZK. This method guarantees to return a
+ * snapshot which contains all hfile references in the zookeeper at the start of this call.
+ * However, some newly created hfile references during the call may not be included.
+ */
+ private Set<String> loadHFileRefsFromPeers() throws KeeperException {
+ Set<String> hfileRefs = Sets.newHashSet();
+ List<String> listOfPeers;
+ for (int retry = 0;; retry++) {
+ int v0 = rqc.getHFileRefsNodeChangeVersion();
+ hfileRefs.clear();
+ listOfPeers = rqc.getAllPeersFromHFileRefsQueue();
+ if (listOfPeers == null) {
+ LOG.debug("Didn't find any peers with hfile references, won't prevent any deletions.");
+ return ImmutableSet.of();
+ }
+ for (String id : listOfPeers) {
+ List<String> peerHFileRefs = rqc.getReplicableHFiles(id);
+ if (peerHFileRefs != null) {
+ hfileRefs.addAll(peerHFileRefs);
+ }
+ }
+ int v1 = rqc.getHFileRefsNodeChangeVersion();
+ if (v0 == v1) {
+ return hfileRefs;
+ }
+ LOG.debug(String.format("Replication hfile references node cversion changed from "
+ + "%d to %d, retry = %d", v0, v1, retry));
+ }
+ }
+
+ @Override
+ public void setConf(Configuration config) {
+ // If either replication or replication of bulk load hfiles is disabled, keep all members null
+ if (!(config.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
+ HConstants.REPLICATION_ENABLE_DEFAULT) && config.getBoolean(
+ HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT))) {
+ LOG.warn(HConstants.REPLICATION_ENABLE_KEY
+ + " is not enabled so allowing all hfile references to be deleted. Better to remove "
+ + ReplicationHFileCleaner.class + " from " + HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS
+ + " configuration.");
+ return;
+ }
+ // Make my own Configuration. Then I'll have my own connection to zk that
+ // I can close myself when time comes.
+ Configuration conf = new Configuration(config);
+ super.setConf(conf);
+ try {
+ initReplicationQueuesClient(conf);
+ } catch (IOException e) {
+ LOG.error("Error while configuring " + this.getClass().getName(), e);
+ }
+ }
+
+ private void initReplicationQueuesClient(Configuration conf)
+ throws ZooKeeperConnectionException, IOException {
+ this.zkw = new ZooKeeperWatcher(conf, "replicationHFileCleaner", null);
+ this.rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this);
+ }
+
+ @Override
+ public void stop(String why) {
+ if (this.stopped) {
+ return;
+ }
+ this.stopped = true;
+ if (this.zkw != null) {
+ LOG.info("Stopping " + this.zkw);
+ this.zkw.close();
+ }
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped;
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.warn("Aborting ReplicationHFileCleaner because " + why, e);
+ this.aborted = true;
+ stop(why);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return this.aborted;
+ }
+
+ @Override
+ public boolean isFileDeletable(FileStatus fStat) {
+ Set<String> hfileRefsFromQueue;
+ // all members of this class are null if replication is disabled,
+ // so do not stop from deleting the file
+ if (getConf() == null) {
+ return true;
+ }
+
+ try {
+ hfileRefsFromQueue = loadHFileRefsFromPeers();
+ } catch (KeeperException e) {
+ LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable "
+ + "file for " + fStat.getPath());
+ return false;
+ }
+ return !hfileRefsFromQueue.contains(fStat.getPath().getName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java
new file mode 100644
index 0000000..8d5c6d4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This will load all the xml configuration files for the source cluster replication ID from
+ * user configured replication configuration directory.
+ */
+@InterfaceAudience.Private
+public class DefaultSourceFSConfigurationProvider implements SourceFSConfigurationProvider {
+ private static final Log LOG = LogFactory.getLog(DefaultSourceFSConfigurationProvider.class);
+ // Map containing all the source clusters configurations against their replication cluster id
+ private Map<String, Configuration> sourceClustersConfs = new HashMap<>();
+ private static final String XML = ".xml";
+
+ @Override
+ public Configuration getConf(Configuration sinkConf, String replicationClusterId)
+ throws IOException {
+ if (sourceClustersConfs.get(replicationClusterId) == null) {
+ synchronized (this.sourceClustersConfs) {
+ if (sourceClustersConfs.get(replicationClusterId) == null) {
+ LOG.info("Loading source cluster FS client conf for cluster " + replicationClusterId);
+ // Load only user provided client configurations.
+ Configuration sourceClusterConf = new Configuration(false);
+
+ String replicationConfDir = sinkConf.get(HConstants.REPLICATION_CONF_DIR);
+ if (replicationConfDir == null) {
+ LOG.debug(HConstants.REPLICATION_CONF_DIR + " is not configured.");
+ URL resource = HBaseConfiguration.class.getClassLoader().getResource("hbase-site.xml");
+ if (resource != null) {
+ String path = resource.getPath();
+ replicationConfDir = path.substring(0, path.lastIndexOf("/"));
+ } else {
+ replicationConfDir = System.getenv("HBASE_CONF_DIR");
+ }
+ }
+
+ LOG.info("Loading source cluster " + replicationClusterId
+ + " file system configurations from xml files under directory " + replicationConfDir);
+ File confDir = new File(replicationConfDir, replicationClusterId);
+ String[] listofConfFiles = FileUtil.list(confDir);
+ for (String confFile : listofConfFiles) {
+ if (new File(confDir, confFile).isFile() && confFile.endsWith(XML)) {
+ // Add all the user provided client conf files
+ sourceClusterConf.addResource(new Path(confDir.getPath(), confFile));
+ }
+ }
+ this.sourceClustersConfs.put(replicationClusterId, sourceClusterConf);
+ }
+ }
+ }
+ return this.sourceClustersConfs.get(replicationClusterId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 7c07ecc..d51d512 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -37,24 +37,26 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.ipc.RemoteException;
/**
- * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
+ * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
* implementation for replicating to another HBase cluster.
* For the slave cluster it selects a random number of peers
* using a replication ratio. For example, if replication ration = 0.1
@@ -84,8 +86,12 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
// Handles connecting to peer region servers
private ReplicationSinkManager replicationSinkMgr;
private boolean peersSelected = false;
+ private String replicationClusterId = "";
private ThreadPoolExecutor exec;
private int maxThreads;
+ private Path baseNamespaceDir;
+ private Path hfileArchiveDir;
+ private boolean replicationBulkLoadDataEnabled;
@Override
public void init(Context context) throws IOException {
@@ -108,7 +114,19 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
this.exec = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
+ new SynchronousQueue<Runnable>());
+
+ this.replicationBulkLoadDataEnabled =
+ conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+ if (this.replicationBulkLoadDataEnabled) {
+ replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID);
+ }
+ // Construct base namespace directory and hfile archive directory path
+ Path rootDir = FSUtils.getRootDir(conf);
+ Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
+ baseNamespaceDir = new Path(rootDir, baseNSDir);
+ hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
}
private void decorateConf() {
@@ -317,8 +335,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
try {
sinkPeer = replicationSinkMgr.getReplicationSink();
BlockingInterface rrs = sinkPeer.getRegionServer();
- ReplicationProtbufUtil.replicateWALEntry(rrs,
- entries.toArray(new Entry[entries.size()]));
+ ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
replicationSinkMgr.reportSinkSuccess(sinkPeer);
return ordinal;
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
new file mode 100644
index 0000000..17f6780
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.math.BigInteger;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.FsDelegationToken;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local
+ * staging directory and then it will use ({@link LoadIncrementalHFiles} to prepare a collection of
+ * {@link LoadQueueItem} which will finally be loaded(replicated) into the table of this cluster.
+ */
+@InterfaceAudience.Private
+public class HFileReplicator {
+ /** Maximum number of threads to allow in pool to copy hfiles during replication */
+ public static final String REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY =
+ "hbase.replication.bulkload.copy.maxthreads";
+ public static final int REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT = 10;
+ /** Number of hfiles to copy per thread during replication */
+ public static final String REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY =
+ "hbase.replication.bulkload.copy.hfiles.perthread";
+ public static final int REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT = 10;
+
+ private static final Log LOG = LogFactory.getLog(HFileReplicator.class);
+ private final String UNDERSCORE = "_";
+ private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
+
+ private Configuration sourceClusterConf;
+ private String sourceBaseNamespaceDirPath;
+ private String sourceHFileArchiveDirPath;
+ private Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap;
+ private FileSystem sinkFs;
+ private FsDelegationToken fsDelegationToken;
+ private UserProvider userProvider;
+ private Configuration conf;
+ private Connection connection;
+ private String hbaseStagingDir;
+ private ThreadPoolExecutor exec;
+ private int maxCopyThreads;
+ private int copiesPerThread;
+
+ public HFileReplicator(Configuration sourceClusterConf,
+ String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
+ Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
+ Connection connection) throws IOException {
+ this.sourceClusterConf = sourceClusterConf;
+ this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
+ this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
+ this.bulkLoadHFileMap = tableQueueMap;
+ this.conf = conf;
+ this.connection = connection;
+
+ userProvider = UserProvider.instantiate(conf);
+ fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
+ this.hbaseStagingDir = conf.get("hbase.bulkload.staging.dir");
+ this.maxCopyThreads =
+ this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
+ REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ builder.setNameFormat("HFileReplicationCallable-%1$d");
+ this.exec =
+ new ThreadPoolExecutor(1, maxCopyThreads, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), builder.build());
+ this.exec.allowCoreThreadTimeOut(true);
+ this.copiesPerThread =
+ conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY,
+ REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT);
+
+ sinkFs = FileSystem.get(conf);
+ }
+
+ public Void replicate() throws IOException {
+ // Copy all the hfiles to the local file system
+ Map<String, Path> tableStagingDirsMap = copyHFilesToStagingDir();
+
+ int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
+
+ for (Entry<String, Path> tableStagingDir : tableStagingDirsMap.entrySet()) {
+ String tableNameString = tableStagingDir.getKey();
+ Path stagingDir = tableStagingDir.getValue();
+
+ LoadIncrementalHFiles loadHFiles = null;
+ try {
+ loadHFiles = new LoadIncrementalHFiles(conf);
+ } catch (Exception e) {
+ LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded"
+ + " data.", e);
+ throw new IOException(e);
+ }
+ Configuration newConf = HBaseConfiguration.create(conf);
+ newConf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
+ loadHFiles.setConf(newConf);
+
+ TableName tableName = TableName.valueOf(tableNameString);
+ Table table = this.connection.getTable(tableName);
+
+ // Prepare collection of queue of hfiles to be loaded(replicated)
+ Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
+ loadHFiles.prepareHFileQueue(stagingDir, table, queue, false);
+
+ if (queue.isEmpty()) {
+ LOG.warn("Replication process did not find any files to replicate in directory "
+ + stagingDir.toUri());
+ return null;
+ }
+
+ try (RegionLocator locator = connection.getRegionLocator(tableName)) {
+
+ fsDelegationToken.acquireDelegationToken(sinkFs);
+
+ // Set the staging directory which will be used by LoadIncrementalHFiles for loading the
+ // data
+ loadHFiles.setBulkToken(stagingDir.toString());
+
+ doBulkLoad(loadHFiles, table, queue, locator, maxRetries);
+ } finally {
+ cleanup(stagingDir.toString(), table);
+ }
+ }
+ return null;
+ }
+
+ private void doBulkLoad(LoadIncrementalHFiles loadHFiles, Table table,
+ Deque<LoadQueueItem> queue, RegionLocator locator, int maxRetries) throws IOException {
+ int count = 0;
+ Pair<byte[][], byte[][]> startEndKeys;
+ while (!queue.isEmpty()) {
+ // need to reload split keys each iteration.
+ startEndKeys = locator.getStartEndKeys();
+ if (count != 0) {
+ LOG.warn("Error occured while replicating HFiles, retry attempt " + count + " with "
+ + queue.size() + " files still remaining to replicate.");
+ }
+
+ if (maxRetries != 0 && count >= maxRetries) {
+ throw new IOException("Retry attempted " + count
+ + " times without completing, bailing out.");
+ }
+ count++;
+
+ // Try bulk load
+ loadHFiles.loadHFileQueue(table, connection, queue, startEndKeys);
+ }
+ }
+
+ private void cleanup(String stagingDir, Table table) {
+ // Release the file system delegation token
+ fsDelegationToken.releaseDelegationToken();
+ // Delete the staging directory
+ if (stagingDir != null) {
+ try {
+ sinkFs.delete(new Path(stagingDir), true);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete the staging directory " + stagingDir, e);
+ }
+ }
+ // Do not close the file system
+
+ /*
+ * if (sinkFs != null) { try { sinkFs.close(); } catch (IOException e) { LOG.warn(
+ * "Failed to close the file system"); } }
+ */
+
+ // Close the table
+ if (table != null) {
+ try {
+ table.close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close the table.", e);
+ }
+ }
+ }
+
+ private Map<String, Path> copyHFilesToStagingDir() throws IOException {
+ Map<String, Path> mapOfCopiedHFiles = new HashMap<String, Path>();
+ Pair<byte[], List<String>> familyHFilePathsPair;
+ List<String> hfilePaths;
+ byte[] family;
+ Path familyStagingDir;
+ int familyHFilePathsPairsListSize;
+ int totalNoOfHFiles;
+ List<Pair<byte[], List<String>>> familyHFilePathsPairsList;
+ FileSystem sourceFs = null;
+
+ try {
+ Path sourceClusterPath = new Path(sourceBaseNamespaceDirPath);
+ /*
+ * Path#getFileSystem will by default get the FS from cache. If both source and sink cluster
+ * has same FS name service then it will return peer cluster FS. To avoid this we explicitly
+ * disable the loading of FS from cache, so that a new FS is created with source cluster
+ * configuration.
+ */
+ String sourceScheme = sourceClusterPath.toUri().getScheme();
+ String disableCacheName =
+ String.format("fs.%s.impl.disable.cache", new Object[] { sourceScheme });
+ sourceClusterConf.setBoolean(disableCacheName, true);
+
+ sourceFs = sourceClusterPath.getFileSystem(sourceClusterConf);
+
+ User user = userProvider.getCurrent();
+ // For each table name in the map
+ for (Entry<String, List<Pair<byte[], List<String>>>> tableEntry : bulkLoadHFileMap
+ .entrySet()) {
+ String tableName = tableEntry.getKey();
+
+ // Create staging directory for each table
+ Path stagingDir =
+ createStagingDir(new Path(hbaseStagingDir), user, TableName.valueOf(tableName));
+
+ familyHFilePathsPairsList = tableEntry.getValue();
+ familyHFilePathsPairsListSize = familyHFilePathsPairsList.size();
+
+ // For each list of family hfile paths pair in the table
+ for (int i = 0; i < familyHFilePathsPairsListSize; i++) {
+ familyHFilePathsPair = familyHFilePathsPairsList.get(i);
+
+ family = familyHFilePathsPair.getFirst();
+ hfilePaths = familyHFilePathsPair.getSecond();
+
+ familyStagingDir = new Path(stagingDir, Bytes.toString(family));
+ totalNoOfHFiles = hfilePaths.size();
+
+ // For each list of hfile paths for the family
+ List<Future<Void>> futures = new ArrayList<Future<Void>>();
+ Callable<Void> c;
+ Future<Void> future;
+ int currentCopied = 0;
+ // Copy the hfiles parallely
+ while (totalNoOfHFiles > currentCopied + this.copiesPerThread) {
+ c =
+ new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied,
+ currentCopied + this.copiesPerThread));
+ future = exec.submit(c);
+ futures.add(future);
+ currentCopied += this.copiesPerThread;
+ }
+
+ int remaining = totalNoOfHFiles - currentCopied;
+ if (remaining > 0) {
+ c =
+ new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied,
+ currentCopied + remaining));
+ future = exec.submit(c);
+ futures.add(future);
+ }
+
+ for (Future<Void> f : futures) {
+ try {
+ f.get();
+ } catch (InterruptedException e) {
+ InterruptedIOException iioe =
+ new InterruptedIOException(
+ "Failed to copy HFiles to local file system. This will be retried again "
+ + "by the source cluster.");
+ iioe.initCause(e);
+ throw iioe;
+ } catch (ExecutionException e) {
+ throw new IOException("Failed to copy HFiles to local file system. This will "
+ + "be retried again by the source cluster.", e);
+ }
+ }
+ }
+ // Add the staging directory to this table. Staging directory contains all the hfiles
+ // belonging to this table
+ mapOfCopiedHFiles.put(tableName, stagingDir);
+ }
+ return mapOfCopiedHFiles;
+ } finally {
+ if (sourceFs != null) {
+ sourceFs.close();
+ }
+ if(exec != null) {
+ exec.shutdown();
+ }
+ }
+ }
+
+ private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException {
+ String tblName = tableName.getNameAsString().replace(":", UNDERSCORE);
+ int RANDOM_WIDTH = 320;
+ int RANDOM_RADIX = 32;
+ String doubleUnderScore = UNDERSCORE + UNDERSCORE;
+ String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore
+ + (new BigInteger(RANDOM_WIDTH, new SecureRandom()).toString(RANDOM_RADIX));
+ return createStagingDir(baseDir, user, randomDir);
+ }
+
+ private Path createStagingDir(Path baseDir, User user, String randomDir) throws IOException {
+ Path p = new Path(baseDir, randomDir);
+ sinkFs.mkdirs(p, PERM_ALL_ACCESS);
+ sinkFs.setPermission(p, PERM_ALL_ACCESS);
+ return p;
+ }
+
+ /**
+ * This class will copy the given hfiles from the given source file system to the given local file
+ * system staging directory.
+ */
+ private class Copier implements Callable<Void> {
+ private FileSystem sourceFs;
+ private Path stagingDir;
+ private List<String> hfiles;
+
+ public Copier(FileSystem sourceFs, final Path stagingDir, final List<String> hfiles)
+ throws IOException {
+ this.sourceFs = sourceFs;
+ this.stagingDir = stagingDir;
+ this.hfiles = hfiles;
+ }
+
+ @Override
+ public Void call() throws IOException {
+ Path sourceHFilePath;
+ Path localHFilePath;
+ int totalHFiles = hfiles.size();
+ for (int i = 0; i < totalHFiles; i++) {
+ sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i));
+ localHFilePath = new Path(stagingDir, sourceHFilePath.getName());
+ try {
+ FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
+ // If any other exception other than FNFE then we will fail the replication requests and
+ // source will retry to replicate these data.
+ } catch (FileNotFoundException e) {
+ LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
+ + ". Trying to copy from hfile archive directory.",
+ e);
+ sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i));
+
+ try {
+ FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
+ } catch (FileNotFoundException e1) {
+ // This will mean that the hfile does not exists any where in source cluster FS. So we
+ // cannot do anything here just log and return.
+ LOG.error("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
+ + ". Hence ignoring this hfile from replication..",
+ e1);
+ return null;
+ }
+ }
+ sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS);
+ }
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
index 37dc1dd..f308daf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
@@ -47,7 +47,7 @@ public class MetricsSink {
if (lastTimestampForAge != timestamp) {
lastTimestampForAge = timestamp;
age = System.currentTimeMillis() - lastTimestampForAge;
- }
+ }
mss.setLastAppliedOpAge(age);
return age;
}
@@ -72,6 +72,17 @@ public class MetricsSink {
}
/**
+ * Convience method to change metrics when a batch of operations are applied.
+ *
+ * @param batchSize total number of mutations that are applied/replicated
+ * @param hfileSize total number of hfiles that are applied/replicated
+ */
+ public void applyBatch(long batchSize, long hfileSize) {
+ applyBatch(batchSize);
+ mss.incrAppliedHFiles(hfileSize);
+ }
+
+ /**
* Get the Age of Last Applied Op
* @return ageOfLastAppliedOp
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index f9f7001..9687af7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -40,11 +40,13 @@ public class MetricsSource {
// tracks last shipped timestamp for each wal group
private Map<String, Long> lastTimeStamps = new HashMap<String, Long>();
private int lastQueueSize = 0;
+ private long lastHFileRefsQueueSize = 0;
private String id;
private final MetricsReplicationSourceSource singleSourceSource;
private final MetricsReplicationSourceSource globalSourceSource;
+
/**
* Constructor used to register the metrics
*
@@ -143,6 +145,18 @@ public class MetricsSource {
globalSourceSource.incrShippedKBs(sizeInKB);
}
+ /**
+ * Convience method to apply changes to metrics do to shipping a batch of logs.
+ *
+ * @param batchSize the size of the batch that was shipped to sinks.
+ * @param hfiles total number of hfiles shipped to sinks.
+ */
+ public void shipBatch(long batchSize, int sizeInKB, long hfiles) {
+ shipBatch(batchSize, sizeInKB);
+ singleSourceSource.incrHFilesShipped(hfiles);
+ globalSourceSource.incrHFilesShipped(hfiles);
+ }
+
/** increase the byte number read by source from log file */
public void incrLogReadInBytes(long readInBytes) {
singleSourceSource.incrLogReadInBytes(readInBytes);
@@ -153,8 +167,10 @@ public class MetricsSource {
public void clear() {
singleSourceSource.clear();
globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
+ globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
lastTimeStamps.clear();
lastQueueSize = 0;
+ lastHFileRefsQueueSize = 0;
}
/**
@@ -194,4 +210,19 @@ public class MetricsSource {
public String getPeerID() {
return id;
}
+
+ public void incrSizeOfHFileRefsQueue(long size) {
+ singleSourceSource.incrSizeOfHFileRefsQueue(size);
+ globalSourceSource.incrSizeOfHFileRefsQueue(size);
+ lastHFileRefsQueueSize = size;
+ }
+
+ public void decrSizeOfHFileRefsQueue(int size) {
+ singleSourceSource.decrSizeOfHFileRefsQueue(size);
+ globalSourceSource.decrSizeOfHFileRefsQueue(size);
+ lastHFileRefsQueueSize -= size;
+ if (lastHFileRefsQueueSize < 0) {
+ lastHFileRefsQueueSize = 0;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index b3db0f6..30153f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -649,8 +649,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
// set the region name for the target region replica
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
- ReplicationProtbufUtil.buildReplicateWALEntryRequest(
- entriesArray, location.getRegionInfo().getEncodedNameAsBytes());
+ ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location
+ .getRegionInfo().getEncodedNameAsBytes(), null, null, null);
try {
PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
controller.setCallTimeout(timeout);
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index b396dfc..d2a0776 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -44,7 +45,10 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -55,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -71,6 +76,7 @@ public class Replication extends WALActionsListener.Base implements
private static final Log LOG =
LogFactory.getLog(Replication.class);
private boolean replication;
+ private boolean replicationForBulkLoadData;
private ReplicationSourceManager replicationManager;
private ReplicationQueues replicationQueues;
private ReplicationPeers replicationPeers;
@@ -84,7 +90,6 @@ public class Replication extends WALActionsListener.Base implements
private int statsThreadPeriod;
// ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad;
-
/**
* Instantiate the replication management (if rep is enabled).
* @param server Hosting server
@@ -109,11 +114,20 @@ public class Replication extends WALActionsListener.Base implements
this.server = server;
this.conf = this.server.getConfiguration();
this.replication = isReplication(this.conf);
+ this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf);
this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
.setDaemon(true)
.build());
+ if (this.replicationForBulkLoadData) {
+ if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
+ || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
+ throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID
+ + " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY
+ + " is set to true.");
+ }
+ }
if (replication) {
try {
this.replicationQueues =
@@ -158,6 +172,15 @@ public class Replication extends WALActionsListener.Base implements
return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
}
+ /**
+ * @param c Configuration to look at
+ * @return True if replication for bulk load data is enabled.
+ */
+ public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) {
+ return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+ }
+
/*
* Returns an object to listen to new wal changes
**/
@@ -187,14 +210,22 @@ public class Replication extends WALActionsListener.Base implements
/**
* Carry on the list of log entries down to the sink
* @param entries list of entries to replicate
- * @param cells The data -- the cells -- that <code>entries</code> describes (the entries
- * do not contain the Cells we are replicating; they are passed here on the side in this
- * CellScanner).
+ * @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not
+ * contain the Cells we are replicating; they are passed here on the side in this
+ * CellScanner).
+ * @param replicationClusterId Id which will uniquely identify source cluster FS client
+ * configurations in the replication configuration directory
+ * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
+ * directory required for replicating hfiles
+ * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
* @throws IOException
*/
- public void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException {
+ public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
+ String replicationClusterId, String sourceBaseNamespaceDirPath,
+ String sourceHFileArchiveDirPath) throws IOException {
if (this.replication) {
- this.replicationSink.replicateEntries(entries, cells);
+ this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
+ sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
}
}
@@ -226,34 +257,44 @@ public class Replication extends WALActionsListener.Base implements
}
@Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
- WALEdit logEdit) {
- scopeWALEdits(htd, logKey, logEdit);
+ public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
+ throws IOException {
+ scopeWALEdits(htd, logKey, logEdit, this.conf, this.getReplicationManager());
}
/**
- * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys
- * from compaction WAL edits and if the scope is local.
+ * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
+ * compaction WAL edits and if the scope is local.
* @param htd Descriptor used to find the scope to use
* @param logKey Key that may get scoped according to its edits
* @param logEdit Edits used to lookup the scopes
+ * @param replicationManager Manager used to add bulk load events hfile references
+ * @throws IOException If failed to parse the WALEdit
*/
- public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey,
- WALEdit logEdit) {
- NavigableMap<byte[], Integer> scopes =
- new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey, WALEdit logEdit,
+ Configuration conf, ReplicationSourceManager replicationManager) throws IOException {
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
byte[] family;
+ boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf);
for (Cell cell : logEdit.getCells()) {
- family = CellUtil.cloneFamily(cell);
- // This is expected and the KV should not be replicated
- if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue;
- // Unexpected, has a tendency to happen in unit tests
- assert htd.getFamily(family) != null;
+ if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+ if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+ scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell);
+ } else {
+ // Skip the flush/compaction/region events
+ continue;
+ }
+ } else {
+ family = CellUtil.cloneFamily(cell);
+ // Unexpected, has a tendency to happen in unit tests
+ assert htd.getFamily(family) != null;
- int scope = htd.getFamily(family).getScope();
- if (scope != REPLICATION_SCOPE_LOCAL &&
- !scopes.containsKey(family)) {
- scopes.put(family, scope);
+ if (!scopes.containsKey(family)) {
+ int scope = htd.getFamily(family).getScope();
+ if (scope != REPLICATION_SCOPE_LOCAL) {
+ scopes.put(family, scope);
+ }
+ }
}
}
if (!scopes.isEmpty()) {
@@ -261,6 +302,40 @@ public class Replication extends WALActionsListener.Base implements
}
}
+ private static void scopeBulkLoadEdits(HTableDescriptor htd,
+ ReplicationSourceManager replicationManager, NavigableMap<byte[], Integer> scopes,
+ TableName tableName, Cell cell) throws IOException {
+ byte[] family;
+ try {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+ for (StoreDescriptor s : bld.getStoresList()) {
+ family = s.getFamilyName().toByteArray();
+ if (!scopes.containsKey(family)) {
+ int scope = htd.getFamily(family).getScope();
+ if (scope != REPLICATION_SCOPE_LOCAL) {
+ scopes.put(family, scope);
+ addHFileRefsToQueue(replicationManager, tableName, family, s);
+ }
+ } else {
+ addHFileRefsToQueue(replicationManager, tableName, family, s);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to get bulk load events information from the wal file.", e);
+ throw e;
+ }
+ }
+
+ private static void addHFileRefsToQueue(ReplicationSourceManager replicationManager,
+ TableName tableName, byte[] family, StoreDescriptor s) throws IOException {
+ try {
+ replicationManager.addHFileRefs(tableName, family, s.getStoreFileList());
+ } catch (ReplicationException e) {
+ LOG.error("Failed to create hfile references in ZK.", e);
+ throw new IOException(e);
+ }
+ }
+
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
getReplicationManager().preLogRoll(newPath);
@@ -272,8 +347,7 @@ public class Replication extends WALActionsListener.Base implements
}
/**
- * This method modifies the master's configuration in order to inject
- * replication-related features
+ * This method modifies the master's configuration in order to inject replication-related features
* @param conf
*/
public static void decorateMasterConfiguration(Configuration conf) {
@@ -285,6 +359,13 @@ public class Replication extends WALActionsListener.Base implements
if (!plugins.contains(cleanerClass)) {
conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
}
+ if (isReplicationForBulkLoadDataEnabled(conf)) {
+ plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
+ cleanerClass = ReplicationHFileCleaner.class.getCanonicalName();
+ if (!plugins.contains(cleanerClass)) {
+ conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass);
+ }
+ }
}
/*
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index f10f5e3..9e7b3af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -33,15 +33,16 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
@@ -51,6 +52,11 @@ import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
/**
* <p>
@@ -78,6 +84,9 @@ public class ReplicationSink {
private final MetricsSink metrics;
private final AtomicLong totalReplicatedEdits = new AtomicLong();
private final Object sharedHtableConLock = new Object();
+ // Number of hfiles that we successfully replicated
+ private long hfilesReplicated = 0;
+ private SourceFSConfigurationProvider provider;
/**
* Create a sink for replication
@@ -91,6 +100,18 @@ public class ReplicationSink {
this.conf = HBaseConfiguration.create(conf);
decorateConf();
this.metrics = new MetricsSink();
+
+ String className =
+ conf.get("hbase.replication.source.fs.conf.provider",
+ DefaultSourceFSConfigurationProvider.class.getCanonicalName());
+ try {
+ @SuppressWarnings("rawtypes")
+ Class c = Class.forName(className);
+ this.provider = (SourceFSConfigurationProvider) c.newInstance();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Configured source fs configuration provider class "
+ + className + " throws error.", e);
+ }
}
/**
@@ -113,9 +134,16 @@ public class ReplicationSink {
* operates against raw protobuf type saving on a conversion from pb to pojo.
* @param entries
* @param cells
- * @throws IOException
+ * @param replicationClusterId Id which will uniquely identify source cluster FS client
+ * configurations in the replication configuration directory
+ * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
+ * directory
+ * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
+ * @throws IOException If failed to replicate the data
*/
- public void replicateEntries(List<WALEntry> entries, final CellScanner cells) throws IOException {
+ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
+ String replicationClusterId, String sourceBaseNamespaceDirPath,
+ String sourceHFileArchiveDirPath) throws IOException {
if (entries.isEmpty()) return;
if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner");
// Very simple optimization where we batch sequences of rows going
@@ -126,6 +154,10 @@ public class ReplicationSink {
// invocation of this method per table and cluster id.
Map<TableName, Map<List<UUID>, List<Row>>> rowMap =
new TreeMap<TableName, Map<List<UUID>, List<Row>>>();
+
+ // Map of table name Vs list of pair of family and list of hfile paths from its namespace
+ Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
+
for (WALEntry entry : entries) {
TableName table =
TableName.valueOf(entry.getKey().getTableName().toByteArray());
@@ -138,33 +170,60 @@ public class ReplicationSink {
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
Cell cell = cells.current();
- if (isNewRowOrType(previousCell, cell)) {
- // Create new mutation
- m = CellUtil.isDelete(cell)?
- new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()):
- new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
- List<UUID> clusterIds = new ArrayList<UUID>();
- for(HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()){
- clusterIds.add(toUUID(clusterId));
+ // Handle bulk load hfiles replication
+ if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+ if (bulkLoadHFileMap == null) {
+ bulkLoadHFileMap = new HashMap<String, List<Pair<byte[], List<String>>>>();
}
- m.setClusterIds(clusterIds);
- addToHashMultiMap(rowMap, table, clusterIds, m);
- }
- if (CellUtil.isDelete(cell)) {
- ((Delete)m).addDeleteMarker(cell);
+ buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell);
} else {
- ((Put)m).add(cell);
+ // Handle wal replication
+ if (isNewRowOrType(previousCell, cell)) {
+ // Create new mutation
+ m =
+ CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(),
+ cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(),
+ cell.getRowLength());
+ List<UUID> clusterIds = new ArrayList<UUID>();
+ for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
+ clusterIds.add(toUUID(clusterId));
+ }
+ m.setClusterIds(clusterIds);
+ addToHashMultiMap(rowMap, table, clusterIds, m);
+ }
+ if (CellUtil.isDelete(cell)) {
+ ((Delete) m).addDeleteMarker(cell);
+ } else {
+ ((Put) m).add(cell);
+ }
+ previousCell = cell;
}
- previousCell = cell;
}
totalReplicated++;
}
- for (Entry<TableName, Map<List<UUID>,List<Row>>> entry : rowMap.entrySet()) {
- batch(entry.getKey(), entry.getValue().values());
+
+ // TODO Replicating mutations and bulk loaded data can be made parallel
+ if (!rowMap.isEmpty()) {
+ LOG.debug("Started replicating mutations.");
+ for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
+ batch(entry.getKey(), entry.getValue().values());
+ }
+ LOG.debug("Finished replicating mutations.");
+ }
+
+ if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
+ LOG.debug("Started replicating bulk loaded data.");
+ HFileReplicator hFileReplicator =
+ new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
+ sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
+ getConnection());
+ hFileReplicator.replicate();
+ LOG.debug("Finished replicating bulk loaded data.");
}
+
int size = entries.size();
this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
- this.metrics.applyBatch(size);
+ this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
this.totalReplicatedEdits.addAndGet(totalReplicated);
} catch (IOException ex) {
LOG.error("Unable to accept edit because:", ex);
@@ -172,6 +231,76 @@ public class ReplicationSink {
}
}
+ private void buildBulkLoadHFileMap(
+ final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
+ Cell cell) throws IOException {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+ List<StoreDescriptor> storesList = bld.getStoresList();
+ int storesSize = storesList.size();
+ for (int j = 0; j < storesSize; j++) {
+ StoreDescriptor storeDescriptor = storesList.get(j);
+ List<String> storeFileList = storeDescriptor.getStoreFileList();
+ int storeFilesSize = storeFileList.size();
+ hfilesReplicated += storeFilesSize;
+ for (int k = 0; k < storeFilesSize; k++) {
+ byte[] family = storeDescriptor.getFamilyName().toByteArray();
+
+ // Build hfile relative path from its namespace
+ String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family);
+
+ String tableName = table.getNameWithNamespaceInclAsString();
+ if (bulkLoadHFileMap.containsKey(tableName)) {
+ List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName);
+ boolean foundFamily = false;
+ for (int i = 0; i < familyHFilePathsList.size(); i++) {
+ Pair<byte[], List<String>> familyHFilePathsPair = familyHFilePathsList.get(i);
+ if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) {
+ // Found family already present, just add the path to the existing list
+ familyHFilePathsPair.getSecond().add(pathToHfileFromNS);
+ foundFamily = true;
+ break;
+ }
+ }
+ if (!foundFamily) {
+ // Family not found, add this family and its hfile paths pair to the list
+ addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList);
+ }
+ } else {
+ // Add this table entry into the map
+ addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName);
+ }
+ }
+ }
+ }
+
+ private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS,
+ List<Pair<byte[], List<String>>> familyHFilePathsList) {
+ List<String> hfilePaths = new ArrayList<String>();
+ hfilePaths.add(pathToHfileFromNS);
+ familyHFilePathsList.add(new Pair<byte[], List<String>>(family, hfilePaths));
+ }
+
+ private void addNewTableEntryInMap(
+ final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family,
+ String pathToHfileFromNS, String tableName) {
+ List<String> hfilePaths = new ArrayList<String>();
+ hfilePaths.add(pathToHfileFromNS);
+ Pair<byte[], List<String>> newFamilyHFilePathsPair =
+ new Pair<byte[], List<String>>(family, hfilePaths);
+ List<Pair<byte[], List<String>>> newFamilyHFilePathsList =
+ new ArrayList<Pair<byte[], List<String>>>();
+ newFamilyHFilePathsList.add(newFamilyHFilePathsPair);
+ bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList);
+ }
+
+ private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile,
+ byte[] family) {
+ return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR)
+ .append(table.getQualifierAsString()).append(Path.SEPARATOR)
+ .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR)
+ .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString();
+ }
+
/**
* @param previousCell
* @param cell
@@ -241,22 +370,13 @@ public class ReplicationSink {
}
Table table = null;
try {
- // See https://en.wikipedia.org/wiki/Double-checked_locking
- Connection connection = this.sharedHtableCon;
- if (connection == null) {
- synchronized (sharedHtableConLock) {
- connection = this.sharedHtableCon;
- if (connection == null) {
- connection = this.sharedHtableCon = ConnectionFactory.createConnection(this.conf);
- }
- }
- }
+ Connection connection = getConnection();
table = connection.getTable(tableName);
for (List<Row> rows : allRows) {
table.batch(rows, null);
}
} catch (InterruptedException ix) {
- throw (InterruptedIOException)new InterruptedIOException().initCause(ix);
+ throw (InterruptedIOException) new InterruptedIOException().initCause(ix);
} finally {
if (table != null) {
table.close();
@@ -264,6 +384,20 @@ public class ReplicationSink {
}
}
+ private Connection getConnection() throws IOException {
+ // See https://en.wikipedia.org/wiki/Double-checked_locking
+ Connection connection = sharedHtableCon;
+ if (connection == null) {
+ synchronized (sharedHtableConLock) {
+ connection = sharedHtableCon;
+ if (connection == null) {
+ connection = sharedHtableCon = ConnectionFactory.createConnection(conf);
+ }
+ }
+ }
+ return connection;
+ }
+
/**
* Get a string representation of this sink's metrics
* @return string with the total replicated edits count and the date
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 3d99523..868ddee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -37,7 +37,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -47,9 +46,10 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@@ -59,8 +59,12 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
@@ -223,6 +227,34 @@ public class ReplicationSource extends Thread
}
}
+ @Override
+ public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+ throws ReplicationException {
+ String peerId = peerClusterZnode;
+ if (peerId.contains("-")) {
+ // peerClusterZnode will be in the form peerId + "-" + rsZNode.
+ // A peerId will not have "-" in its name, see HBASE-11394
+ peerId = peerClusterZnode.split("-")[0];
+ }
+ Map<TableName, List<String>> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs();
+ if (tableCFMap != null) {
+ List<String> tableCfs = tableCFMap.get(tableName);
+ if (tableCFMap.containsKey(tableName)
+ && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
+ this.replicationQueues.addHFileRefs(peerId, files);
+ metrics.incrSizeOfHFileRefsQueue(files.size());
+ } else {
+ LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
+ + Bytes.toString(family) + " to peer id " + peerId);
+ }
+ } else {
+ // user has explicitly not defined any table cfs for replication, means replicate all the
+ // data
+ this.replicationQueues.addHFileRefs(peerId, files);
+ metrics.incrSizeOfHFileRefsQueue(files.size());
+ }
+ }
+
private void uninitialize() {
LOG.debug("Source exiting " + this.peerId);
metrics.clear();
@@ -471,6 +503,8 @@ public class ReplicationSource extends Thread
private int currentSize = 0;
// Indicates whether this particular worker is running
private boolean workerRunning = true;
+ // Current number of hfiles that we need to replicate
+ private long currentNbHFiles = 0;
public ReplicationSourceWorkerThread(String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
@@ -550,6 +584,7 @@ public class ReplicationSource extends Thread
boolean gotIOE = false;
currentNbOperations = 0;
+ currentNbHFiles = 0;
List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
currentSize = 0;
try {
@@ -701,6 +736,28 @@ public class ReplicationSource extends Thread
return seenEntries == 0 && processEndOfFile();
}
+ private void cleanUpHFileRefs(WALEdit edit) throws IOException {
+ String peerId = peerClusterZnode;
+ if (peerId.contains("-")) {
+ // peerClusterZnode will be in the form peerId + "-" + rsZNode.
+ // A peerId will not have "-" in its name, see HBASE-11394
+ peerId = peerClusterZnode.split("-")[0];
+ }
+ List<Cell> cells = edit.getCells();
+ for (int i = 0; i < cells.size(); i++) {
+ Cell cell = cells.get(i);
+ if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+ List<StoreDescriptor> stores = bld.getStoresList();
+ for (int j = 0; j < stores.size(); j++) {
+ List<String> storeFileList = stores.get(j).getStoreFileList();
+ manager.cleanUpHFileRefs(peerId, storeFileList);
+ metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
+ }
+ }
+ }
+ }
+
/**
* Poll for the next path
* @return true if a path was obtained, false if not
@@ -853,14 +910,31 @@ public class ReplicationSource extends Thread
private int countDistinctRowKeys(WALEdit edit) {
List<Cell> cells = edit.getCells();
int distinctRowKeys = 1;
+ int totalHFileEntries = 0;
Cell lastCell = cells.get(0);
+
for (int i = 0; i < edit.size(); i++) {
+ // Count HFiles to be replicated
+ if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
+ try {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
+ List<StoreDescriptor> stores = bld.getStoresList();
+ for (int j = 0; j < stores.size(); j++) {
+ totalHFileEntries += stores.get(j).getStoreFileList().size();
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to deserialize bulk load entry from wal edit. "
+ + "This its hfiles count will not be added into metric.");
+ }
+ }
+
if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
distinctRowKeys++;
}
lastCell = cells.get(i);
}
- return distinctRowKeys;
+ currentNbHFiles += totalHFileEntries;
+ return distinctRowKeys + totalHFileEntries;
}
/**
@@ -914,6 +988,12 @@ public class ReplicationSource extends Thread
}
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
+ //Clean up hfile references
+ int size = entries.size();
+ for (int i = 0; i < size; i++) {
+ cleanUpHFileRefs(entries.get(i).getEdit());
+ }
+ //Log and clean up WAL logs
manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
currentWALisBeingWrittenTo);
@@ -925,7 +1005,7 @@ public class ReplicationSource extends Thread
totalReplicatedEdits.addAndGet(entries.size());
totalReplicatedOperations.addAndGet(currentNbOperations);
// FIXME check relationship between wal group and overall
- metrics.shipBatch(currentNbOperations, currentSize / 1024);
+ metrics.shipBatch(currentNbOperations, currentSize / 1024, currentNbHFiles);
metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
walGroupId);
if (LOG.isTraceEnabled()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 1e9c714..7f4a9f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
+import java.util.List;
import java.util.UUID;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -26,7 +27,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -105,4 +108,14 @@ public interface ReplicationSourceInterface {
*/
String getStats();
+ /**
+ * Add hfile names to the queue to be replicated.
+ * @param tableName Name of the table these files belongs to
+ * @param family Name of the family these files belong to
+ * @param files files whose names needs to be added to the queue to be replicated
+ * @throws ReplicationException If failed to add hfile references
+ */
+ void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+ throws ReplicationException;
+
}