You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2015/12/10 08:38:36 UTC
[3/3] hbase git commit: HBASE-13153 Bulk Loaded HFile Replication
(Ashish Singhi)
HBASE-13153 Bulk Loaded HFile Replication (Ashish Singhi)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/26ac60b0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/26ac60b0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/26ac60b0
Branch: refs/heads/master
Commit: 26ac60b03f80c9215103a02db783341e67037753
Parents: 9647fee
Author: ramkrishna <ra...@gmail.com>
Authored: Thu Dec 10 13:07:46 2015 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Thu Dec 10 13:07:46 2015 +0530
----------------------------------------------------------------------
.../hbase/replication/ReplicationPeers.java | 2 +-
.../replication/ReplicationPeersZKImpl.java | 26 +-
.../hbase/replication/ReplicationQueues.java | 25 +-
.../replication/ReplicationQueuesClient.java | 25 +-
.../ReplicationQueuesClientZKImpl.java | 37 ++
.../replication/ReplicationQueuesZKImpl.java | 70 +++
.../replication/ReplicationStateZKBase.java | 14 +-
.../apache/hadoop/hbase/zookeeper/ZKUtil.java | 24 +-
.../org/apache/hadoop/hbase/HConstants.java | 16 +-
.../MetricsReplicationSinkSource.java | 2 +
.../MetricsReplicationSourceSource.java | 6 +
.../MetricsReplicationGlobalSourceSource.java | 21 +
.../MetricsReplicationSinkSourceImpl.java | 7 +
.../MetricsReplicationSourceSourceImpl.java | 28 +
.../hbase/protobuf/generated/AdminProtos.java | 602 +++++++++++++++++--
hbase-protocol/src/main/protobuf/Admin.proto | 3 +
.../hbase/mapreduce/LoadIncrementalHFiles.java | 152 +++--
.../hbase/protobuf/ReplicationProtbufUtil.java | 46 +-
.../hbase/regionserver/RSRpcServices.java | 4 +-
.../regionserver/ReplicationSinkService.java | 8 +-
.../regionserver/wal/WALActionsListener.java | 19 +-
.../hbase/replication/ScopeWALEntryFilter.java | 72 ++-
.../replication/TableCfWALEntryFilter.java | 76 ++-
.../master/ReplicationHFileCleaner.java | 193 ++++++
.../DefaultSourceFSConfigurationProvider.java | 78 +++
.../HBaseInterClusterReplicationEndpoint.java | 32 +-
.../regionserver/HFileReplicator.java | 393 ++++++++++++
.../replication/regionserver/MetricsSink.java | 13 +-
.../replication/regionserver/MetricsSource.java | 31 +
.../RegionReplicaReplicationEndpoint.java | 4 +-
.../replication/regionserver/Replication.java | 133 +++-
.../regionserver/ReplicationSink.java | 200 +++++-
.../regionserver/ReplicationSource.java | 92 ++-
.../ReplicationSourceInterface.java | 13 +
.../regionserver/ReplicationSourceManager.java | 21 +
.../SourceFSConfigurationProvider.java | 40 ++
.../security/access/SecureBulkLoadEndpoint.java | 18 +-
.../cleaner/TestReplicationHFileCleaner.java | 264 ++++++++
.../replication/ReplicationSourceDummy.java | 8 +
.../replication/TestMasterReplication.java | 313 +++++++++-
.../replication/TestReplicationSmallTests.java | 3 +-
.../replication/TestReplicationStateBasic.java | 57 ++
.../replication/TestReplicationStateZKImpl.java | 1 +
.../replication/TestReplicationSyncUpTool.java | 10 +-
...ReplicationSyncUpToolWithBulkLoadedData.java | 235 ++++++++
.../regionserver/TestReplicationSink.java | 179 +++++-
.../TestReplicationSourceManager.java | 70 ++-
.../TestSourceFSConfigurationProvider.java | 25 +
48 files changed, 3444 insertions(+), 267 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 8e80e06..8bf21d5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -50,7 +50,7 @@ public interface ReplicationPeers {
* @param peerId a short that identifies the cluster
* @param peerConfig configuration for the replication slave cluster
* @param tableCFs the table and column-family list which will be replicated for this peer or null
- * for all table and column families
+ * for all table and column families
*/
void addPeer(String peerId, ReplicationPeerConfig peerConfig, String tableCFs)
throws ReplicationException;
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 63f9ac3..fd10b66 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
import com.google.protobuf.ByteString;
@@ -120,8 +121,21 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
checkQueuesDeleted(id);
-
+
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+
+ // If only bulk load hfile replication is enabled then add peerId node to hfile-refs node
+ if (replicationForBulkLoadEnabled) {
+ try {
+ String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
+ LOG.info("Adding peer " + peerId + " to hfile reference queue.");
+ ZKUtil.createWithParents(this.zookeeper, peerId);
+ } catch (KeeperException e) {
+ throw new ReplicationException("Failed to add peer with id=" + id
+ + ", node under hfile references node.", e);
+ }
+ }
+
List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
toByteArray(peerConfig));
@@ -151,6 +165,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
+ " because that id does not exist.");
}
ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+ // Delete peerId node from hfile-refs node irrespective of whether bulk loaded hfile
+ // replication is enabled or not
+
+ String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
+ try {
+ LOG.info("Removing peer " + peerId + " from hfile reference queue.");
+ ZKUtil.deleteNodeRecursively(this.zookeeper, peerId);
+ } catch (NoNodeException e) {
+ LOG.info("Did not find node " + peerId + " to delete.", e);
+ }
} catch (KeeperException e) {
throw new ReplicationException("Could not remove peer with id=" + id, e);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index 3dbbc33..0d47a88 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -26,7 +26,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* This provides an interface for maintaining a region server's replication queues. These queues
- * keep track of the WALs that still need to be replicated to remote clusters.
+ * keep track of the WALs and HFile references (if hbase.replication.bulkload.enabled is enabled)
+ * that still need to be replicated to remote clusters.
*/
@InterfaceAudience.Private
public interface ReplicationQueues {
@@ -113,4 +114,26 @@ public interface ReplicationQueues {
* @return if this is this rs's znode
*/
boolean isThisOurZnode(String znode);
+
+ /**
+ * Add a peer to hfile reference queue if peer does not exist.
+ * @param peerId peer cluster id to be added
+ * @throws ReplicationException if fails to add a peer id to hfile reference queue
+ */
+ void addPeerToHFileRefs(String peerId) throws ReplicationException;
+
+ /**
+ * Add new hfile references to the queue.
+ * @param peerId peer cluster id to which the hfiles need to be replicated
+ * @param files list of hfile references to be added
+ * @throws ReplicationException if fails to add a hfile reference
+ */
+ void addHFileRefs(String peerId, List<String> files) throws ReplicationException;
+
+ /**
+ * Remove hfile references from the queue.
+ * @param peerId peer cluster id from which this hfile references needs to be removed
+ * @param files list of hfile references to be removed
+ */
+ void removeHFileRefs(String peerId, List<String> files);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
index 5b3e541..7fa3bbb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
@@ -25,7 +25,8 @@ import org.apache.zookeeper.KeeperException;
/**
* This provides an interface for clients of replication to view replication queues. These queues
- * keep track of the WALs that still need to be replicated to remote clusters.
+ * keep track of the sources(WALs/HFile references) that still need to be replicated to remote
+ * clusters.
*/
@InterfaceAudience.Private
public interface ReplicationQueuesClient {
@@ -65,4 +66,26 @@ public interface ReplicationQueuesClient {
* @return cversion of replication rs node
*/
int getQueuesZNodeCversion() throws KeeperException;
+
+ /**
+ * Get the change version number of replication hfile references node. This can be used as
+ * optimistic locking to get a consistent snapshot of the replication queues of hfile references.
+ * @return change version number of hfile references node
+ */
+ int getHFileRefsNodeChangeVersion() throws KeeperException;
+
+ /**
+ * Get list of all peers from hfile reference queue.
+ * @return a list of peer ids
+ * @throws KeeperException zookeeper exception
+ */
+ List<String> getAllPeersFromHFileRefsQueue() throws KeeperException;
+
+ /**
+ * Get a list of all hfile references in the given peer.
+ * @param peerId a String that identifies the peer
+ * @return a list of hfile references, null if not found any
+ * @throws KeeperException zookeeper exception
+ */
+ List<String> getReplicableHFiles(String peerId) throws KeeperException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index e1a6a49..cc407e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -84,4 +84,41 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
throw e;
}
}
+
+ @Override
+ public int getHFileRefsNodeChangeVersion() throws KeeperException {
+ Stat stat = new Stat();
+ try {
+ ZKUtil.getDataNoWatch(this.zookeeper, this.hfileRefsZNode, stat);
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed to get stat of replication hfile references node.", e);
+ throw e;
+ }
+ return stat.getCversion();
+ }
+
+ @Override
+ public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException {
+ List<String> result = null;
+ try {
+ result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.hfileRefsZNode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed to get list of all peers in hfile references node.", e);
+ throw e;
+ }
+ return result;
+ }
+
+ @Override
+ public List<String> getReplicableHFiles(String peerId) throws KeeperException {
+ String znode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+ List<String> result = null;
+ try {
+ result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed to get list of hfile references for peerId=" + peerId, e);
+ throw e;
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 97763e2..43dd412 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -84,6 +84,15 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
} catch (KeeperException e) {
throw new ReplicationException("Could not initialize replication queues.", e);
}
+ // If only bulk load hfile replication is enabled then create the hfile-refs znode
+ if (replicationForBulkLoadEnabled) {
+ try {
+ ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
+ } catch (KeeperException e) {
+ throw new ReplicationException("Could not initialize hfile references replication queue.",
+ e);
+ }
+ }
}
@Override
@@ -431,4 +440,65 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
+
+ @Override
+ public void addHFileRefs(String peerId, List<String> files) throws ReplicationException {
+ String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+ boolean debugEnabled = LOG.isDebugEnabled();
+ if (debugEnabled) {
+ LOG.debug("Adding hfile references " + files + " in queue " + peerZnode);
+ }
+ List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
+ int size = files.size();
+ for (int i = 0; i < size; i++) {
+ listOfOps.add(ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i)),
+ HConstants.EMPTY_BYTE_ARRAY));
+ }
+ if (debugEnabled) {
+ LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode
+ + " is " + listOfOps.size());
+ }
+ try {
+ ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
+ } catch (KeeperException e) {
+ throw new ReplicationException("Failed to create hfile reference znode=" + e.getPath(), e);
+ }
+ }
+
+ @Override
+ public void removeHFileRefs(String peerId, List<String> files) {
+ String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+ boolean debugEnabled = LOG.isDebugEnabled();
+ if (debugEnabled) {
+ LOG.debug("Removing hfile references " + files + " from queue " + peerZnode);
+ }
+ List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
+ int size = files.size();
+ for (int i = 0; i < size; i++) {
+ listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i))));
+ }
+ if (debugEnabled) {
+ LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode
+ + " is " + listOfOps.size());
+ }
+ try {
+ ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
+ } catch (KeeperException e) {
+ LOG.error("Failed to remove hfile reference znode=" + e.getPath(), e);
+ }
+ }
+
+ @Override
+ public void addPeerToHFileRefs(String peerId) throws ReplicationException {
+ String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+ try {
+ if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
+ LOG.info("Adding peer " + peerId + " to hfile reference queue.");
+ ZKUtil.createWithParents(this.zookeeper, peerZnode);
+ }
+ } catch (KeeperException e) {
+ throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.",
+ e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
index 4fbac0f..762167f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
@@ -20,9 +20,10 @@ package org.apache.hadoop.hbase.replication;
import java.util.List;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
@@ -48,32 +49,43 @@ public abstract class ReplicationStateZKBase {
protected final String peersZNode;
/** The name of the znode that contains all replication queues */
protected final String queuesZNode;
+ /** The name of the znode that contains queues of hfile references to be replicated */
+ protected final String hfileRefsZNode;
/** The cluster key of the local cluster */
protected final String ourClusterKey;
protected final ZooKeeperWatcher zookeeper;
protected final Configuration conf;
protected final Abortable abortable;
+ protected final boolean replicationForBulkLoadEnabled;
// Public for testing
public static final byte[] ENABLED_ZNODE_BYTES =
toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
public static final byte[] DISABLED_ZNODE_BYTES =
toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
+ public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
+ "zookeeper.znode.replication.hfile.refs";
+ public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf,
Abortable abortable) {
this.zookeeper = zookeeper;
this.conf = conf;
this.abortable = abortable;
+ this.replicationForBulkLoadEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
+ String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
+ ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);
+ this.hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName);
}
public List<String> getListOfReplicators() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index c268268..9e01d09 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
@@ -884,7 +885,7 @@ public class ZKUtil {
JaasConfiguration.SERVER_KEYTAB_KERBEROS_CONFIG_NAME) == null
&& conf.get(HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL) == null
&& conf.get(HConstants.ZK_SERVER_KERBEROS_PRINCIPAL) == null) {
-
+
return false;
}
} catch(Exception e) {
@@ -1797,6 +1798,27 @@ public class ZKUtil {
} else if (child.equals(zkw.getConfiguration().
get("zookeeper.znode.replication.rs", "rs"))) {
appendRSZnodes(zkw, znode, sb);
+ } else if (child.equals(zkw.getConfiguration().get(
+ ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
+ ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT))) {
+ appendHFileRefsZnodes(zkw, znode, sb);
+ }
+ }
+ }
+
+ private static void appendHFileRefsZnodes(ZooKeeperWatcher zkw, String hfileRefsZnode,
+ StringBuilder sb) throws KeeperException {
+ sb.append("\n").append(hfileRefsZnode).append(": ");
+ for (String peerIdZnode : ZKUtil.listChildrenNoWatch(zkw, hfileRefsZnode)) {
+ String znodeToProcess = ZKUtil.joinZNode(hfileRefsZnode, peerIdZnode);
+ sb.append("\n").append(znodeToProcess).append(": ");
+ List<String> peerHFileRefsZnodes = ZKUtil.listChildrenNoWatch(zkw, znodeToProcess);
+ int size = peerHFileRefsZnodes.size();
+ for (int i = 0; i < size; i++) {
+ sb.append(peerHFileRefsZnodes.get(i));
+ if (i != size - 1) {
+ sb.append(", ");
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index ac57514..6fafad3 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -850,6 +850,18 @@ public final class HConstants {
REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service";
public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT =
"org.apache.hadoop.hbase.replication.regionserver.Replication";
+ public static final String REPLICATION_BULKLOAD_ENABLE_KEY = "hbase.replication.bulkload.enabled";
+ public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
+ /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
+ public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id";
+ /**
+ * Directory where the source cluster file system client configuration are placed which is used by
+ * sink cluster to copy HFiles from source cluster file system
+ */
+ public static final String REPLICATION_CONF_DIR = "hbase.replication.conf.dir";
+
+ /** Maximum time to retry for a failed bulk load request */
+ public static final String BULKLOAD_MAX_RETRIES_NUMBER = "hbase.bulkload.retries.number";
/** HBCK special code name used as server name when manipulating ZK nodes */
public static final String HBCK_CODE_NAME = "HBCKServerName";
@@ -1241,7 +1253,7 @@ public final class HConstants {
public static final String HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY =
"hbase.canary.write.table.check.period";
-
+
/**
* Configuration keys for programmatic JAAS configuration for secured ZK interaction
*/
@@ -1250,7 +1262,7 @@ public final class HConstants {
"hbase.zookeeper.client.kerberos.principal";
public static final String ZK_SERVER_KEYTAB_FILE = "hbase.zookeeper.server.keytab.file";
public static final String ZK_SERVER_KERBEROS_PRINCIPAL =
- "hbase.zookeeper.server.kerberos.principal";
+ "hbase.zookeeper.server.kerberos.principal";
private HConstants() {
// Can't be instantiated with this ctor.
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
index 698a59a..9fb8415 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
@@ -22,9 +22,11 @@ public interface MetricsReplicationSinkSource {
public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp";
public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches";
public static final String SINK_APPLIED_OPS = "sink.appliedOps";
+ public static final String SINK_APPLIED_HFILES = "sink.appliedHFiles";
void setLastAppliedOpAge(long age);
void incrAppliedBatches(long batches);
void incrAppliedOps(long batchsize);
long getLastAppliedOpAge();
+ void incrAppliedHFiles(long hfileSize);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index fecf191..188c3a3 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -32,6 +32,9 @@ public interface MetricsReplicationSourceSource {
public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered";
+ public static final String SOURCE_SHIPPED_HFILES = "source.shippedHFiles";
+ public static final String SOURCE_SIZE_OF_HFILE_REFS_QUEUE = "source.sizeOfHFileRefsQueue";
+
void setLastShippedAge(long age);
void setSizeOfLogQueue(int size);
void incrSizeOfLogQueue(int size);
@@ -44,4 +47,7 @@ public interface MetricsReplicationSourceSource {
void incrLogReadInEdits(long size);
void clear();
long getLastShippedAge();
+ void incrHFilesShipped(long hfiles);
+ void incrSizeOfHFileRefsQueue(long size);
+ void decrSizeOfHFileRefsQueue(long size);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 6dace10..392cd39 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -32,6 +32,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
private final MutableCounterLong shippedOpsCounter;
private final MutableCounterLong shippedKBsCounter;
private final MutableCounterLong logReadInBytesCounter;
+ private final MutableCounterLong shippedHFilesCounter;
+ private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
this.rms = rms;
@@ -51,6 +53,11 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
logReadInEditsCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
logEditsFilteredCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
+
+ shippedHFilesCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_SHIPPED_HFILES, 0L);
+
+ sizeOfHFileRefsQueueGauge =
+ rms.getMetricsRegistry().getLongGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L);
}
@Override public void setLastShippedAge(long age) {
@@ -100,4 +107,18 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public long getLastShippedAge() {
return ageOfLastShippedOpGauge.value();
}
+
+ @Override public void incrHFilesShipped(long hfiles) {
+ shippedHFilesCounter.incr(hfiles);
+ }
+
+ @Override
+ public void incrSizeOfHFileRefsQueue(long size) {
+ sizeOfHFileRefsQueueGauge.incr(size);
+ }
+
+ @Override
+ public void decrSizeOfHFileRefsQueue(long size) {
+ sizeOfHFileRefsQueueGauge.decr(size);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
index 14212ba..8f4a337 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
@@ -26,11 +26,13 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
private final MutableGaugeLong ageGauge;
private final MutableCounterLong batchesCounter;
private final MutableCounterLong opsCounter;
+ private final MutableCounterLong hfilesCounter;
public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) {
ageGauge = rms.getMetricsRegistry().getLongGauge(SINK_AGE_OF_LAST_APPLIED_OP, 0L);
batchesCounter = rms.getMetricsRegistry().getLongCounter(SINK_APPLIED_BATCHES, 0L);
opsCounter = rms.getMetricsRegistry().getLongCounter(SINK_APPLIED_OPS, 0L);
+ hfilesCounter = rms.getMetricsRegistry().getLongCounter(SINK_APPLIED_HFILES, 0L);
}
@Override public void setLastAppliedOpAge(long age) {
@@ -49,4 +51,9 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
public long getLastAppliedOpAge() {
return ageGauge.value();
}
+
+ @Override
+ public void incrAppliedHFiles(long hfiles) {
+ hfilesCounter.incr(hfiles);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 1422e7e..217cc3e 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -32,6 +32,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String shippedOpsKey;
private final String shippedKBsKey;
private final String logReadInBytesKey;
+ private final String shippedHFilesKey;
+ private final String sizeOfHFileRefsQueueKey;
private final MutableGaugeLong ageOfLastShippedOpGauge;
private final MutableGaugeLong sizeOfLogQueueGauge;
@@ -41,6 +43,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final MutableCounterLong shippedOpsCounter;
private final MutableCounterLong shippedKBsCounter;
private final MutableCounterLong logReadInBytesCounter;
+ private final MutableCounterLong shippedHFilesCounter;
+ private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
this.rms = rms;
@@ -69,6 +73,12 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
logEditsFilteredKey = "source." + id + ".logEditsFiltered";
logEditsFilteredCounter = rms.getMetricsRegistry().getLongCounter(logEditsFilteredKey, 0L);
+
+ shippedHFilesKey = "source." + this.id + ".shippedHFiles";
+ shippedHFilesCounter = rms.getMetricsRegistry().getLongCounter(shippedHFilesKey, 0L);
+
+ sizeOfHFileRefsQueueKey = "source." + id + ".sizeOfHFileRefsQueue";
+ sizeOfHFileRefsQueueGauge = rms.getMetricsRegistry().getLongGauge(sizeOfHFileRefsQueueKey, 0L);
}
@Override public void setLastShippedAge(long age) {
@@ -124,10 +134,28 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
rms.removeMetric(logReadInEditsKey);
rms.removeMetric(logEditsFilteredKey);
+
+ rms.removeMetric(shippedHFilesKey);
+ rms.removeMetric(sizeOfHFileRefsQueueKey);
}
@Override
public long getLastShippedAge() {
return ageOfLastShippedOpGauge.value();
}
+
+ @Override
+ public void incrHFilesShipped(long hfiles) {
+ shippedHFilesCounter.incr(hfiles);
+ }
+
+ @Override
+ public void incrSizeOfHFileRefsQueue(long size) {
+ sizeOfHFileRefsQueueGauge.incr(size);
+ }
+
+ @Override
+ public void decrSizeOfHFileRefsQueue(long size) {
+ sizeOfHFileRefsQueueGauge.decr(size);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
index b4c378b..1c59ea6 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
@@ -16896,6 +16896,51 @@ public final class AdminProtos {
*/
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntryOrBuilder getEntryOrBuilder(
int index);
+
+ // optional string replicationClusterId = 2;
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ boolean hasReplicationClusterId();
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ java.lang.String getReplicationClusterId();
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ com.google.protobuf.ByteString
+ getReplicationClusterIdBytes();
+
+ // optional string sourceBaseNamespaceDirPath = 3;
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ boolean hasSourceBaseNamespaceDirPath();
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ java.lang.String getSourceBaseNamespaceDirPath();
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ com.google.protobuf.ByteString
+ getSourceBaseNamespaceDirPathBytes();
+
+ // optional string sourceHFileArchiveDirPath = 4;
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ boolean hasSourceHFileArchiveDirPath();
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ java.lang.String getSourceHFileArchiveDirPath();
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ com.google.protobuf.ByteString
+ getSourceHFileArchiveDirPathBytes();
}
/**
* Protobuf type {@code hbase.pb.ReplicateWALEntryRequest}
@@ -16963,6 +17008,21 @@ public final class AdminProtos {
entry_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.PARSER, extensionRegistry));
break;
}
+ case 18: {
+ bitField0_ |= 0x00000001;
+ replicationClusterId_ = input.readBytes();
+ break;
+ }
+ case 26: {
+ bitField0_ |= 0x00000002;
+ sourceBaseNamespaceDirPath_ = input.readBytes();
+ break;
+ }
+ case 34: {
+ bitField0_ |= 0x00000004;
+ sourceHFileArchiveDirPath_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -17005,6 +17065,7 @@ public final class AdminProtos {
return PARSER;
}
+ private int bitField0_;
// repeated .hbase.pb.WALEntry entry = 1;
public static final int ENTRY_FIELD_NUMBER = 1;
private java.util.List<org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry> entry_;
@@ -17041,8 +17102,140 @@ public final class AdminProtos {
return entry_.get(index);
}
+ // optional string replicationClusterId = 2;
+ public static final int REPLICATIONCLUSTERID_FIELD_NUMBER = 2;
+ private java.lang.Object replicationClusterId_;
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public boolean hasReplicationClusterId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public java.lang.String getReplicationClusterId() {
+ java.lang.Object ref = replicationClusterId_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ replicationClusterId_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public com.google.protobuf.ByteString
+ getReplicationClusterIdBytes() {
+ java.lang.Object ref = replicationClusterId_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ replicationClusterId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ // optional string sourceBaseNamespaceDirPath = 3;
+ public static final int SOURCEBASENAMESPACEDIRPATH_FIELD_NUMBER = 3;
+ private java.lang.Object sourceBaseNamespaceDirPath_;
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public boolean hasSourceBaseNamespaceDirPath() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public java.lang.String getSourceBaseNamespaceDirPath() {
+ java.lang.Object ref = sourceBaseNamespaceDirPath_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ sourceBaseNamespaceDirPath_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public com.google.protobuf.ByteString
+ getSourceBaseNamespaceDirPathBytes() {
+ java.lang.Object ref = sourceBaseNamespaceDirPath_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ sourceBaseNamespaceDirPath_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ // optional string sourceHFileArchiveDirPath = 4;
+ public static final int SOURCEHFILEARCHIVEDIRPATH_FIELD_NUMBER = 4;
+ private java.lang.Object sourceHFileArchiveDirPath_;
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public boolean hasSourceHFileArchiveDirPath() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public java.lang.String getSourceHFileArchiveDirPath() {
+ java.lang.Object ref = sourceHFileArchiveDirPath_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ sourceHFileArchiveDirPath_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public com.google.protobuf.ByteString
+ getSourceHFileArchiveDirPathBytes() {
+ java.lang.Object ref = sourceHFileArchiveDirPath_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ sourceHFileArchiveDirPath_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
entry_ = java.util.Collections.emptyList();
+ replicationClusterId_ = "";
+ sourceBaseNamespaceDirPath_ = "";
+ sourceHFileArchiveDirPath_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -17065,6 +17258,15 @@ public final class AdminProtos {
for (int i = 0; i < entry_.size(); i++) {
output.writeMessage(1, entry_.get(i));
}
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeBytes(2, getReplicationClusterIdBytes());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBytes(3, getSourceBaseNamespaceDirPathBytes());
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeBytes(4, getSourceHFileArchiveDirPathBytes());
+ }
getUnknownFields().writeTo(output);
}
@@ -17078,6 +17280,18 @@ public final class AdminProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(1, entry_.get(i));
}
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(2, getReplicationClusterIdBytes());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(3, getSourceBaseNamespaceDirPathBytes());
+ }
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(4, getSourceHFileArchiveDirPathBytes());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -17103,6 +17317,21 @@ public final class AdminProtos {
boolean result = true;
result = result && getEntryList()
.equals(other.getEntryList());
+ result = result && (hasReplicationClusterId() == other.hasReplicationClusterId());
+ if (hasReplicationClusterId()) {
+ result = result && getReplicationClusterId()
+ .equals(other.getReplicationClusterId());
+ }
+ result = result && (hasSourceBaseNamespaceDirPath() == other.hasSourceBaseNamespaceDirPath());
+ if (hasSourceBaseNamespaceDirPath()) {
+ result = result && getSourceBaseNamespaceDirPath()
+ .equals(other.getSourceBaseNamespaceDirPath());
+ }
+ result = result && (hasSourceHFileArchiveDirPath() == other.hasSourceHFileArchiveDirPath());
+ if (hasSourceHFileArchiveDirPath()) {
+ result = result && getSourceHFileArchiveDirPath()
+ .equals(other.getSourceHFileArchiveDirPath());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -17120,6 +17349,18 @@ public final class AdminProtos {
hash = (37 * hash) + ENTRY_FIELD_NUMBER;
hash = (53 * hash) + getEntryList().hashCode();
}
+ if (hasReplicationClusterId()) {
+ hash = (37 * hash) + REPLICATIONCLUSTERID_FIELD_NUMBER;
+ hash = (53 * hash) + getReplicationClusterId().hashCode();
+ }
+ if (hasSourceBaseNamespaceDirPath()) {
+ hash = (37 * hash) + SOURCEBASENAMESPACEDIRPATH_FIELD_NUMBER;
+ hash = (53 * hash) + getSourceBaseNamespaceDirPath().hashCode();
+ }
+ if (hasSourceHFileArchiveDirPath()) {
+ hash = (37 * hash) + SOURCEHFILEARCHIVEDIRPATH_FIELD_NUMBER;
+ hash = (53 * hash) + getSourceHFileArchiveDirPath().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -17243,6 +17484,12 @@ public final class AdminProtos {
} else {
entryBuilder_.clear();
}
+ replicationClusterId_ = "";
+ bitField0_ = (bitField0_ & ~0x00000002);
+ sourceBaseNamespaceDirPath_ = "";
+ bitField0_ = (bitField0_ & ~0x00000004);
+ sourceHFileArchiveDirPath_ = "";
+ bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
@@ -17270,6 +17517,7 @@ public final class AdminProtos {
public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest buildPartial() {
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest result = new org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest(this);
int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
if (entryBuilder_ == null) {
if (((bitField0_ & 0x00000001) == 0x00000001)) {
entry_ = java.util.Collections.unmodifiableList(entry_);
@@ -17279,6 +17527,19 @@ public final class AdminProtos {
} else {
result.entry_ = entryBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.replicationClusterId_ = replicationClusterId_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.sourceBaseNamespaceDirPath_ = sourceBaseNamespaceDirPath_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.sourceHFileArchiveDirPath_ = sourceHFileArchiveDirPath_;
+ result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
@@ -17320,6 +17581,21 @@ public final class AdminProtos {
}
}
}
+ if (other.hasReplicationClusterId()) {
+ bitField0_ |= 0x00000002;
+ replicationClusterId_ = other.replicationClusterId_;
+ onChanged();
+ }
+ if (other.hasSourceBaseNamespaceDirPath()) {
+ bitField0_ |= 0x00000004;
+ sourceBaseNamespaceDirPath_ = other.sourceBaseNamespaceDirPath_;
+ onChanged();
+ }
+ if (other.hasSourceHFileArchiveDirPath()) {
+ bitField0_ |= 0x00000008;
+ sourceHFileArchiveDirPath_ = other.sourceHFileArchiveDirPath_;
+ onChanged();
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -17593,6 +17869,228 @@ public final class AdminProtos {
return entryBuilder_;
}
+ // optional string replicationClusterId = 2;
+ private java.lang.Object replicationClusterId_ = "";
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public boolean hasReplicationClusterId() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public java.lang.String getReplicationClusterId() {
+ java.lang.Object ref = replicationClusterId_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ replicationClusterId_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public com.google.protobuf.ByteString
+ getReplicationClusterIdBytes() {
+ java.lang.Object ref = replicationClusterId_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ replicationClusterId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public Builder setReplicationClusterId(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ replicationClusterId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public Builder clearReplicationClusterId() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ replicationClusterId_ = getDefaultInstance().getReplicationClusterId();
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string replicationClusterId = 2;</code>
+ */
+ public Builder setReplicationClusterIdBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ replicationClusterId_ = value;
+ onChanged();
+ return this;
+ }
+
+ // optional string sourceBaseNamespaceDirPath = 3;
+ private java.lang.Object sourceBaseNamespaceDirPath_ = "";
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public boolean hasSourceBaseNamespaceDirPath() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public java.lang.String getSourceBaseNamespaceDirPath() {
+ java.lang.Object ref = sourceBaseNamespaceDirPath_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ sourceBaseNamespaceDirPath_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public com.google.protobuf.ByteString
+ getSourceBaseNamespaceDirPathBytes() {
+ java.lang.Object ref = sourceBaseNamespaceDirPath_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ sourceBaseNamespaceDirPath_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public Builder setSourceBaseNamespaceDirPath(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ sourceBaseNamespaceDirPath_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public Builder clearSourceBaseNamespaceDirPath() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ sourceBaseNamespaceDirPath_ = getDefaultInstance().getSourceBaseNamespaceDirPath();
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+ */
+ public Builder setSourceBaseNamespaceDirPathBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ sourceBaseNamespaceDirPath_ = value;
+ onChanged();
+ return this;
+ }
+
+ // optional string sourceHFileArchiveDirPath = 4;
+ private java.lang.Object sourceHFileArchiveDirPath_ = "";
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public boolean hasSourceHFileArchiveDirPath() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public java.lang.String getSourceHFileArchiveDirPath() {
+ java.lang.Object ref = sourceHFileArchiveDirPath_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ sourceHFileArchiveDirPath_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public com.google.protobuf.ByteString
+ getSourceHFileArchiveDirPathBytes() {
+ java.lang.Object ref = sourceHFileArchiveDirPath_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ sourceHFileArchiveDirPath_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public Builder setSourceHFileArchiveDirPath(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000008;
+ sourceHFileArchiveDirPath_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public Builder clearSourceHFileArchiveDirPath() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ sourceHFileArchiveDirPath_ = getDefaultInstance().getSourceHFileArchiveDirPath();
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+ */
+ public Builder setSourceHFileArchiveDirPathBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000008;
+ sourceHFileArchiveDirPath_ = value;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:hbase.pb.ReplicateWALEntryRequest)
}
@@ -23539,56 +24037,58 @@ public final class AdminProtos {
"ster_system_time\030\004 \001(\004\"\026\n\024MergeRegionsRe" +
"sponse\"a\n\010WALEntry\022\035\n\003key\030\001 \002(\0132\020.hbase." +
"pb.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025as",
- "sociated_cell_count\030\003 \001(\005\"=\n\030ReplicateWA" +
- "LEntryRequest\022!\n\005entry\030\001 \003(\0132\022.hbase.pb." +
- "WALEntry\"\033\n\031ReplicateWALEntryResponse\"\026\n" +
- "\024RollWALWriterRequest\"0\n\025RollWALWriterRe" +
- "sponse\022\027\n\017region_to_flush\030\001 \003(\014\"#\n\021StopS" +
- "erverRequest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServ" +
- "erResponse\"\026\n\024GetServerInfoRequest\"K\n\nSe" +
- "rverInfo\022)\n\013server_name\030\001 \002(\0132\024.hbase.pb" +
- ".ServerName\022\022\n\nwebui_port\030\002 \001(\r\"B\n\025GetSe" +
- "rverInfoResponse\022)\n\013server_info\030\001 \002(\0132\024.",
- "hbase.pb.ServerInfo\"\034\n\032UpdateConfigurati" +
- "onRequest\"\035\n\033UpdateConfigurationResponse" +
- "2\207\013\n\014AdminService\022P\n\rGetRegionInfo\022\036.hba" +
- "se.pb.GetRegionInfoRequest\032\037.hbase.pb.Ge" +
- "tRegionInfoResponse\022M\n\014GetStoreFile\022\035.hb" +
- "ase.pb.GetStoreFileRequest\032\036.hbase.pb.Ge" +
- "tStoreFileResponse\022V\n\017GetOnlineRegion\022 ." +
- "hbase.pb.GetOnlineRegionRequest\032!.hbase." +
- "pb.GetOnlineRegionResponse\022G\n\nOpenRegion" +
- "\022\033.hbase.pb.OpenRegionRequest\032\034.hbase.pb",
- ".OpenRegionResponse\022M\n\014WarmupRegion\022\035.hb" +
- "ase.pb.WarmupRegionRequest\032\036.hbase.pb.Wa" +
- "rmupRegionResponse\022J\n\013CloseRegion\022\034.hbas" +
- "e.pb.CloseRegionRequest\032\035.hbase.pb.Close" +
- "RegionResponse\022J\n\013FlushRegion\022\034.hbase.pb" +
- ".FlushRegionRequest\032\035.hbase.pb.FlushRegi" +
- "onResponse\022J\n\013SplitRegion\022\034.hbase.pb.Spl" +
- "itRegionRequest\032\035.hbase.pb.SplitRegionRe" +
- "sponse\022P\n\rCompactRegion\022\036.hbase.pb.Compa" +
- "ctRegionRequest\032\037.hbase.pb.CompactRegion",
- "Response\022M\n\014MergeRegions\022\035.hbase.pb.Merg" +
- "eRegionsRequest\032\036.hbase.pb.MergeRegionsR" +
- "esponse\022\\\n\021ReplicateWALEntry\022\".hbase.pb." +
- "ReplicateWALEntryRequest\032#.hbase.pb.Repl" +
- "icateWALEntryResponse\022Q\n\006Replay\022\".hbase." +
- "pb.ReplicateWALEntryRequest\032#.hbase.pb.R" +
- "eplicateWALEntryResponse\022P\n\rRollWALWrite" +
- "r\022\036.hbase.pb.RollWALWriterRequest\032\037.hbas" +
- "e.pb.RollWALWriterResponse\022P\n\rGetServerI" +
- "nfo\022\036.hbase.pb.GetServerInfoRequest\032\037.hb",
- "ase.pb.GetServerInfoResponse\022G\n\nStopServ" +
- "er\022\033.hbase.pb.StopServerRequest\032\034.hbase." +
- "pb.StopServerResponse\022_\n\022UpdateFavoredNo" +
- "des\022#.hbase.pb.UpdateFavoredNodesRequest" +
- "\032$.hbase.pb.UpdateFavoredNodesResponse\022b" +
- "\n\023UpdateConfiguration\022$.hbase.pb.UpdateC" +
- "onfigurationRequest\032%.hbase.pb.UpdateCon" +
- "figurationResponseBA\n*org.apache.hadoop." +
- "hbase.protobuf.generatedB\013AdminProtosH\001\210" +
- "\001\001\240\001\001"
+ "sociated_cell_count\030\003 \001(\005\"\242\001\n\030ReplicateW" +
+ "ALEntryRequest\022!\n\005entry\030\001 \003(\0132\022.hbase.pb" +
+ ".WALEntry\022\034\n\024replicationClusterId\030\002 \001(\t\022" +
+ "\"\n\032sourceBaseNamespaceDirPath\030\003 \001(\t\022!\n\031s" +
+ "ourceHFileArchiveDirPath\030\004 \001(\t\"\033\n\031Replic" +
+ "ateWALEntryResponse\"\026\n\024RollWALWriterRequ" +
+ "est\"0\n\025RollWALWriterResponse\022\027\n\017region_t" +
+ "o_flush\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006re" +
+ "ason\030\001 \002(\t\"\024\n\022StopServerResponse\"\026\n\024GetS" +
+ "erverInfoRequest\"K\n\nServerInfo\022)\n\013server",
+ "_name\030\001 \002(\0132\024.hbase.pb.ServerName\022\022\n\nweb" +
+ "ui_port\030\002 \001(\r\"B\n\025GetServerInfoResponse\022)" +
+ "\n\013server_info\030\001 \002(\0132\024.hbase.pb.ServerInf" +
+ "o\"\034\n\032UpdateConfigurationRequest\"\035\n\033Updat" +
+ "eConfigurationResponse2\207\013\n\014AdminService\022" +
+ "P\n\rGetRegionInfo\022\036.hbase.pb.GetRegionInf" +
+ "oRequest\032\037.hbase.pb.GetRegionInfoRespons" +
+ "e\022M\n\014GetStoreFile\022\035.hbase.pb.GetStoreFil" +
+ "eRequest\032\036.hbase.pb.GetStoreFileResponse" +
+ "\022V\n\017GetOnlineRegion\022 .hbase.pb.GetOnline",
+ "RegionRequest\032!.hbase.pb.GetOnlineRegion" +
+ "Response\022G\n\nOpenRegion\022\033.hbase.pb.OpenRe" +
+ "gionRequest\032\034.hbase.pb.OpenRegionRespons" +
+ "e\022M\n\014WarmupRegion\022\035.hbase.pb.WarmupRegio" +
+ "nRequest\032\036.hbase.pb.WarmupRegionResponse" +
+ "\022J\n\013CloseRegion\022\034.hbase.pb.CloseRegionRe" +
+ "quest\032\035.hbase.pb.CloseRegionResponse\022J\n\013" +
+ "FlushRegion\022\034.hbase.pb.FlushRegionReques" +
+ "t\032\035.hbase.pb.FlushRegionResponse\022J\n\013Spli" +
+ "tRegion\022\034.hbase.pb.SplitRegionRequest\032\035.",
+ "hbase.pb.SplitRegionResponse\022P\n\rCompactR" +
+ "egion\022\036.hbase.pb.CompactRegionRequest\032\037." +
+ "hbase.pb.CompactRegionResponse\022M\n\014MergeR" +
+ "egions\022\035.hbase.pb.MergeRegionsRequest\032\036." +
+ "hbase.pb.MergeRegionsResponse\022\\\n\021Replica" +
+ "teWALEntry\022\".hbase.pb.ReplicateWALEntryR" +
+ "equest\032#.hbase.pb.ReplicateWALEntryRespo" +
+ "nse\022Q\n\006Replay\022\".hbase.pb.ReplicateWALEnt" +
+ "ryRequest\032#.hbase.pb.ReplicateWALEntryRe" +
+ "sponse\022P\n\rRollWALWriter\022\036.hbase.pb.RollW",
+ "ALWriterRequest\032\037.hbase.pb.RollWALWriter" +
+ "Response\022P\n\rGetServerInfo\022\036.hbase.pb.Get" +
+ "ServerInfoRequest\032\037.hbase.pb.GetServerIn" +
+ "foResponse\022G\n\nStopServer\022\033.hbase.pb.Stop" +
+ "ServerRequest\032\034.hbase.pb.StopServerRespo" +
+ "nse\022_\n\022UpdateFavoredNodes\022#.hbase.pb.Upd" +
+ "ateFavoredNodesRequest\032$.hbase.pb.Update" +
+ "FavoredNodesResponse\022b\n\023UpdateConfigurat" +
+ "ion\022$.hbase.pb.UpdateConfigurationReques" +
+ "t\032%.hbase.pb.UpdateConfigurationResponse",
+ "BA\n*org.apache.hadoop.hbase.protobuf.gen" +
+ "eratedB\013AdminProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -23750,7 +24250,7 @@ public final class AdminProtos {
internal_static_hbase_pb_ReplicateWALEntryRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_ReplicateWALEntryRequest_descriptor,
- new java.lang.String[] { "Entry", });
+ new java.lang.String[] { "Entry", "ReplicationClusterId", "SourceBaseNamespaceDirPath", "SourceHFileArchiveDirPath", });
internal_static_hbase_pb_ReplicateWALEntryResponse_descriptor =
getDescriptor().getMessageTypes().get(24);
internal_static_hbase_pb_ReplicateWALEntryResponse_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-protocol/src/main/protobuf/Admin.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto
index f7787f5..a1905a4 100644
--- a/hbase-protocol/src/main/protobuf/Admin.proto
+++ b/hbase-protocol/src/main/protobuf/Admin.proto
@@ -211,6 +211,9 @@ message WALEntry {
*/
message ReplicateWALEntryRequest {
repeated WALEntry entry = 1;
+ optional string replicationClusterId = 2;
+ optional string sourceBaseNamespaceDirPath = 3;
+ optional string sourceHFileArchiveDirPath = 4;
}
message ReplicateWALEntryResponse {
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 44be2d3..369ae90 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSHDFSUtils;
@@ -125,6 +126,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private FsDelegationToken fsDelegationToken;
private String bulkToken;
private UserProvider userProvider;
+ private int nrThreads;
private LoadIncrementalHFiles() {}
@@ -146,6 +148,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
+ nrThreads = conf.getInt("hbase.loadincremental.threads.max",
+ Runtime.getRuntime().availableProcessors());
initalized = true;
}
@@ -246,7 +250,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* region boundary, and each part is added back into the queue.
* The import process finishes when the queue is empty.
*/
- static class LoadQueueItem {
+ public static class LoadQueueItem {
final byte[] family;
final Path hfilePath;
@@ -313,7 +317,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @param table the table to load into
* @throws TableNotFoundException if table does not yet exist
*/
- @SuppressWarnings("deprecation")
public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
RegionLocator regionLocator) throws TableNotFoundException, IOException {
@@ -321,16 +324,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throw new TableNotFoundException("Table " + table.getName() + "is not currently available.");
}
- // initialize thread pools
- int nrThreads = getConf().getInt("hbase.loadincremental.threads.max",
- Runtime.getRuntime().availableProcessors());
- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
- builder.setNameFormat("LoadIncrementalHFiles-%1$d");
- ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
- 60, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- builder.build());
- ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
+ ExecutorService pool = createExecutorService();
// LQI queue does not need to be threadsafe -- all operations on this queue
// happen in this thread
@@ -347,30 +341,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
"option, consider removing the files and bulkload again without this option. " +
"See HBASE-13985");
}
- discoverLoadQueue(queue, hfofDir, validateHFile);
- // check whether there is invalid family name in HFiles to be bulkloaded
- Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
- ArrayList<String> familyNames = new ArrayList<String>(families.size());
- for (HColumnDescriptor family : families) {
- familyNames.add(family.getNameAsString());
- }
- ArrayList<String> unmatchedFamilies = new ArrayList<String>();
- Iterator<LoadQueueItem> queueIter = queue.iterator();
- while (queueIter.hasNext()) {
- LoadQueueItem lqi = queueIter.next();
- String familyNameInHFile = Bytes.toString(lqi.family);
- if (!familyNames.contains(familyNameInHFile)) {
- unmatchedFamilies.add(familyNameInHFile);
- }
- }
- if (unmatchedFamilies.size() > 0) {
- String msg =
- "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
- + unmatchedFamilies + "; valid family names of table "
- + table.getName() + " are: " + familyNames;
- LOG.error(msg);
- throw new IOException(msg);
- }
+ prepareHFileQueue(hfofDir, table, queue, validateHFile);
+
int count = 0;
if (queue.isEmpty()) {
@@ -397,7 +369,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
+ count + " with " + queue.size() + " files remaining to group or split");
}
- int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 10);
+ int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
if (maxRetries != 0 && count >= maxRetries) {
throw new IOException("Retry attempted " + count +
@@ -447,6 +419,85 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
/**
+ * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
+ * passed directory and validates whether the prepared queue has all the valid table column
+ * families in it.
+ * @param hfilesDir directory containing list of hfiles to be loaded into the table
+ * @param table table to which hfiles should be loaded
+ * @param queue queue which needs to be loaded into the table
+ * @throws IOException If any I/O or network error occurred
+ */
+ public void prepareHFileQueue(Path hfofDir, Table table, Deque<LoadQueueItem> queue,
+ boolean validateHFile) throws IOException {
+ discoverLoadQueue(queue, hfofDir, validateHFile);
+ validateFamiliesInHFiles(table, queue);
+ }
+
+ // Initialize a thread pool
+ private ExecutorService createExecutorService() {
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ builder.setNameFormat("LoadIncrementalHFiles-%1$d");
+ ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), builder.build());
+ ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
+ return pool;
+ }
+
+ /**
+ * Checks whether there is any invalid family name in HFiles to be bulk loaded.
+ */
+ private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue)
+ throws IOException {
+ Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
+ List<String> familyNames = new ArrayList<String>(families.size());
+ for (HColumnDescriptor family : families) {
+ familyNames.add(family.getNameAsString());
+ }
+ List<String> unmatchedFamilies = new ArrayList<String>();
+ Iterator<LoadQueueItem> queueIter = queue.iterator();
+ while (queueIter.hasNext()) {
+ LoadQueueItem lqi = queueIter.next();
+ String familyNameInHFile = Bytes.toString(lqi.family);
+ if (!familyNames.contains(familyNameInHFile)) {
+ unmatchedFamilies.add(familyNameInHFile);
+ }
+ }
+ if (unmatchedFamilies.size() > 0) {
+ String msg =
+ "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
+ + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: "
+ + familyNames;
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ }
+
+ /**
+ * Used by the replication sink to load the hfiles from the source cluster. It does the following,
+ * 1. {@link LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)} 2.
+ * {@link
+ * LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)}
+ * @param table Table to which these hfiles should be loaded to
+ * @param conn Connection to use
+ * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
+ * @param startEndKeys starting and ending row keys of the region
+ */
+ public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
+ Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+ ExecutorService pool = null;
+ try {
+ pool = createExecutorService();
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups =
+ groupOrSplitPhase(table, pool, queue, startEndKeys);
+ bulkLoadPhase(table, conn, pool, queue, regionGroups);
+ } finally {
+ if (pool != null) {
+ pool.shutdown();
+ }
+ }
+ }
+
+ /**
* This takes the LQI's grouped by likely regions and attempts to bulk load
* them. Any failures are re-queued for another pass with the
* groupOrSplitPhase.
@@ -592,10 +643,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
String uniqueName = getUniqueName();
HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
+
Path botOut = new Path(tmpDir, uniqueName + ".bottom");
Path topOut = new Path(tmpDir, uniqueName + ".top");
- splitStoreFile(getConf(), hfilePath, familyDesc, splitKey,
- botOut, topOut);
+ splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
FileSystem fs = tmpDir.getFileSystem(getConf());
fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
@@ -626,6 +677,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
final Pair<byte[][], byte[][]> startEndKeys)
throws IOException {
final Path hfilePath = item.hfilePath;
+ // fs is the source filesystem
+ if (fs == null) {
+ fs = hfilePath.getFileSystem(getConf());
+ }
HFile.Reader hfr = HFile.createReader(fs, hfilePath,
new CacheConfig(getConf()), getConf());
final byte[] first, last;
@@ -712,7 +767,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* failure
*/
protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
- final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
+ final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
throws IOException {
final List<Pair<byte[], String>> famPaths =
new ArrayList<Pair<byte[], String>>(lqis.size());
@@ -747,6 +802,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
//in user directory
if(secureClient != null && !success) {
FileSystem targetFs = FileSystem.get(getConf());
+ // fs is the source filesystem
+ if(fs == null) {
+ fs = lqis.iterator().next().hfilePath.getFileSystem(getConf());
+ }
// Check to see if the source and target filesystems are the same
// If they are the same filesystem, we will try move the files back
// because previously we moved them to the staging directory.
@@ -1000,4 +1059,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
System.exit(ret);
}
+ /**
+ * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
+ * used only when {@link SecureBulkLoadEndpoint} is configured in hbase.coprocessor.region.classes
+ * property. This directory is used as a temporary directory where all files are initially
+ * copied/moved from user given directory, set all the required file permissions and then from
+ * their it is finally loaded into a table. This should be set only when, one would like to manage
+ * the staging directory by itself. Otherwise this tool will handle this by itself.
+ * @param stagingDir staging directory path
+ */
+ public void setBulkToken(String stagingDir) {
+ this.bulkToken = stagingDir;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index d6a120b..91185af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -28,22 +28,23 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALKey;
import com.google.protobuf.ServiceException;
@@ -51,15 +52,20 @@ import com.google.protobuf.ServiceException;
public class ReplicationProtbufUtil {
/**
* A helper to replicate a list of WAL entries using admin protocol.
- *
- * @param admin
- * @param entries
+ * @param admin Admin service
+ * @param entries Array of WAL entries to be replicated
+ * @param replicationClusterId Id which will uniquely identify source cluster FS client
+ * configurations in the replication configuration directory
+ * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
+ * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
* @throws java.io.IOException
*/
public static void replicateWALEntry(final AdminService.BlockingInterface admin,
- final Entry[] entries) throws IOException {
+ final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir,
+ Path sourceHFileArchiveDir) throws IOException {
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
- buildReplicateWALEntryRequest(entries, null);
+ buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
+ sourceHFileArchiveDir);
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
try {
admin.replicateWALEntry(controller, p.getFirst());
@@ -77,19 +83,22 @@ public class ReplicationProtbufUtil {
*/
public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
buildReplicateWALEntryRequest(final Entry[] entries) {
- return buildReplicateWALEntryRequest(entries, null);
+ return buildReplicateWALEntryRequest(entries, null, null, null, null);
}
/**
* Create a new ReplicateWALEntryRequest from a list of WAL entries
- *
* @param entries the WAL entries to be replicated
* @param encodedRegionName alternative region name to use if not null
- * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
- * found.
+ * @param replicationClusterId Id which will uniquely identify source cluster FS client
+ * configurations in the replication configuration directory
+ * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
+ * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
+ * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
*/
public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
- buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName) {
+ buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName,
+ String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) {
// Accumulate all the Cells seen in here.
List<List<? extends Cell>> allCells = new ArrayList<List<? extends Cell>>(entries.length);
int size = 0;
@@ -146,6 +155,17 @@ public class ReplicationProtbufUtil {
entryBuilder.setAssociatedCellCount(cells.size());
builder.addEntry(entryBuilder.build());
}
+
+ if (replicationClusterId != null) {
+ builder.setReplicationClusterId(replicationClusterId);
+ }
+ if (sourceBaseNamespaceDir != null) {
+ builder.setSourceBaseNamespaceDirPath(sourceBaseNamespaceDir.toString());
+ }
+ if (sourceHFileArchiveDir != null) {
+ builder.setSourceHFileArchiveDirPath(sourceHFileArchiveDir.toString());
+ }
+
return new Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>(builder.build(),
getCellScanner(allCells, size));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index d94e11c..0c9b0e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1800,7 +1800,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
List<WALEntry> entries = request.getEntryList();
CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
- regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
+ regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner,
+ request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
+ request.getSourceHFileArchiveDirPath());
regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
return ReplicateWALEntryResponse.newBuilder().build();
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
index 5f96bf7..836d3aa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
@@ -36,7 +36,13 @@ public interface ReplicationSinkService extends ReplicationService {
* Carry on the list of log entries down to the sink
* @param entries list of WALEntries to replicate
* @param cells Cells that the WALEntries refer to (if cells is non-null)
+ * @param replicationClusterId Id which will uniquely identify source cluster FS client
+ * configurations in the replication configuration directory
+ * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
+ * directory required for replicating hfiles
+ * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
* @throws IOException
*/
- void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException;
+ void replicateLogEntries(List<WALEntry> entries, CellScanner cells, String replicationClusterId,
+ String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath) throws IOException;
}