You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/23 21:42:43 UTC
[2/4] hbase git commit: HBASE-17442 Move most of the replication
related classes from hbase-client to new hbase-replication package. (Guanghao
Zhang).
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml
new file mode 100644
index 0000000..858e9fc
--- /dev/null
+++ b/hbase-replication/pom.xml
@@ -0,0 +1,264 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!--
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ -->
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>hbase-build-configuration</artifactId>
+ <groupId>org.apache.hbase</groupId>
+ <version>3.0.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+ <artifactId>hbase-replication</artifactId>
+ <name>Apache HBase - Replication</name>
+ <description>HBase Replication Support</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-site-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <!--Make it so assembly:single does nothing in here-->
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <skipAssembly>true</skipAssembly>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <!-- Always skip the second part executions, since we only run
+ simple unit tests in this module -->
+ <executions>
+ <execution>
+ <id>secondPartTestsExecution</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- Make a jar and put the sources in the jar -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ </plugin>
+ </plugins>
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings
+ only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <versionRange>[3.2,)</versionRange>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+ <dependencies>
+ <!-- Intra-project dependencies -->
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-annotations</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>jdk.tools</groupId>
+ <artifactId>jdk.tools</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-annotations</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <type>test-jar</type>
+ </dependency>
+ <!-- General dependencies -->
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <!-- profile against Hadoop 2.x: This is the default. -->
+ <profile>
+ <id>hadoop-2.0</id>
+ <activation>
+ <property>
+ <!--Below formatting for dev-support/generate-hadoopX-poms.sh-->
+ <!--h2--><name>!hadoop.profile</name>
+ </property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>com.github.stephenc.findbugs</groupId>
+ <artifactId>findbugs-annotations</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>net.java.dev.jets3t</groupId>
+ <artifactId>jets3t</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </profile>
+
+ <!--
+ profile for building against Hadoop 3.0.x. Activate using:
+ mvn -Dhadoop.profile=3.0
+ -->
+ <profile>
+ <id>hadoop-3.0</id>
+ <activation>
+ <property>
+ <name>hadoop.profile</name>
+ <value>3.0</value>
+ </property>
+ </activation>
+ <properties>
+ <hadoop.version>3.0-SNAPSHOT</hadoop.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+</project>
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
new file mode 100644
index 0000000..8506cbb
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.lang.reflect.ConstructorUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+/**
+ * A factory class for instantiating replication objects that deal with replication state.
+ */
+@InterfaceAudience.Private
+public class ReplicationFactory {
+
+ public static final Class defaultReplicationQueueClass = ReplicationQueuesZKImpl.class;
+
+ public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args)
+ throws Exception {
+ Class<?> classToBuild = args.getConf().getClass("hbase.region.replica." +
+ "replication.replicationQueues.class", defaultReplicationQueueClass);
+ return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args);
+ }
+
+ public static ReplicationQueuesClient getReplicationQueuesClient(
+ ReplicationQueuesClientArguments args) throws Exception {
+ Class<?> classToBuild = args.getConf().getClass(
+ "hbase.region.replica.replication.replicationQueuesClient.class",
+ ReplicationQueuesClientZKImpl.class);
+ return (ReplicationQueuesClient) ConstructorUtils.invokeConstructor(classToBuild, args);
+ }
+
+ public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
+ Abortable abortable) {
+ return getReplicationPeers(zk, conf, null, abortable);
+ }
+
+ public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
+ final ReplicationQueuesClient queuesClient, Abortable abortable) {
+ return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable);
+ }
+
+ public static ReplicationTracker getReplicationTracker(ZooKeeperWatcher zookeeper,
+ final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable,
+ Stoppable stopper) {
+ return new ReplicationTrackerZKImpl(zookeeper, replicationPeers, conf, abortable, stopper);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
new file mode 100644
index 0000000..dfb5fdc
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * The replication listener interface can be implemented if a class needs to subscribe to events
+ * generated by the ReplicationTracker. These events include things like addition/deletion of peer
+ * clusters or failure of a local region server. To receive events, the class also needs to register
+ * itself with a Replication Tracker.
+ */
+@InterfaceAudience.Private
+public interface ReplicationListener {
+
+ /**
+ * A region server has been removed from the local cluster
+ * @param regionServer the removed region server
+ */
+ public void regionServerRemoved(String regionServer);
+
+ /**
+ * A peer cluster has been removed (i.e. unregistered) from replication.
+ * @param peerId The peer id of the cluster that has been removed
+ */
+ public void peerRemoved(String peerId);
+
+ /**
+ * The list of registered peer clusters has changed.
+ * @param peerIds A list of all currently registered peer clusters
+ */
+ public void peerListChanged(List<String> peerIds);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
new file mode 100644
index 0000000..4f18048
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+
+/**
+ * ReplicationPeer manages enabled / disabled state for the peer.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface ReplicationPeer {
+
+ /**
+ * State of the peer, whether it is enabled or not
+ */
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+ enum PeerState {
+ ENABLED,
+ DISABLED
+ }
+
+ /**
+ * Get the identifier of this peer
+ * @return string representation of the id
+ */
+ String getId();
+
+ /**
+ * Get the peer config object
+ * @return the ReplicationPeerConfig for this peer
+ */
+ public ReplicationPeerConfig getPeerConfig();
+
+ /**
+ * Returns the state of the peer
+ * @return the enabled state
+ */
+ PeerState getPeerState();
+
+ /**
+ * Get the configuration object required to communicate with this peer
+ * @return configuration object
+ */
+ public Configuration getConfiguration();
+
+ /**
+ * Get replicable (table, cf-list) map of this peer
+ * @return the replicable (table, cf-list) map
+ */
+ public Map<TableName, List<String>> getTableCFs();
+
+ /**
+ * Get replicable namespace set of this peer
+ * @return the replicable namespaces set
+ */
+ public Set<String> getNamespaces();
+
+ /**
+ * Get the per node bandwidth upper limit for this peer
+ * @return the bandwidth upper limit
+ */
+ public long getPeerBandwidth();
+
+ void trackPeerConfigChanges(ReplicationPeerConfigListener listener);
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
new file mode 100644
index 0000000..4e04186
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface ReplicationPeerConfigListener {
+ /** Callback method for when users update the ReplicationPeerConfig for this peer
+ *
+ * @param rpc The updated ReplicationPeerConfig
+ */
+ void peerConfigUpdated(ReplicationPeerConfig rpc);
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
new file mode 100644
index 0000000..3973be9
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -0,0 +1,318 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+
+@InterfaceAudience.Private
+public class ReplicationPeerZKImpl extends ReplicationStateZKBase
+ implements ReplicationPeer, Abortable, Closeable {
+ private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
+
+ private ReplicationPeerConfig peerConfig;
+ private final String id;
+ private volatile PeerState peerState;
+ private volatile Map<TableName, List<String>> tableCFs = new HashMap<>();
+ private final Configuration conf;
+ private PeerStateTracker peerStateTracker;
+ private PeerConfigTracker peerConfigTracker;
+
+
+ /**
+ * Constructor that takes all the objects required to communicate with the specified peer, except
+ * for the region server addresses.
+ * @param conf configuration object to this peer
+ * @param id string representation of this peer's identifier
+ * @param peerConfig configuration for the replication peer
+ */
+ public ReplicationPeerZKImpl(ZooKeeperWatcher zkWatcher, Configuration conf,
+ String id, ReplicationPeerConfig peerConfig,
+ Abortable abortable)
+ throws ReplicationException {
+ super(zkWatcher, conf, abortable);
+ this.conf = conf;
+ this.peerConfig = peerConfig;
+ this.id = id;
+ }
+
+ /**
+ * start a state tracker to check whether this peer is enabled or not
+ *
+ * @param peerStateNode path to zk node which stores peer state
+ * @throws KeeperException
+ */
+ public void startStateTracker(String peerStateNode)
+ throws KeeperException {
+ ensurePeerEnabled(peerStateNode);
+ this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
+ this.peerStateTracker.start();
+ try {
+ this.readPeerStateZnode();
+ } catch (DeserializationException e) {
+ throw ZKUtil.convert(e);
+ }
+ }
+
+ private void readPeerStateZnode() throws DeserializationException {
+ this.peerState =
+ isStateEnabled(this.peerStateTracker.getData(false))
+ ? PeerState.ENABLED
+ : PeerState.DISABLED;
+ }
+
+ /**
+ * start a table-cfs tracker to listen the (table, cf-list) map change
+ * @param peerConfigNode path to zk node which stores table-cfs
+ * @throws KeeperException
+ */
+ public void startPeerConfigTracker(String peerConfigNode)
+ throws KeeperException {
+ this.peerConfigTracker = new PeerConfigTracker(peerConfigNode, zookeeper,
+ this);
+ this.peerConfigTracker.start();
+ this.readPeerConfig();
+ }
+
+ private ReplicationPeerConfig readPeerConfig() {
+ try {
+ byte[] data = peerConfigTracker.getData(false);
+ if (data != null) {
+ this.peerConfig = ReplicationSerDeHelper.parsePeerFrom(data);
+ }
+ } catch (DeserializationException e) {
+ LOG.error("", e);
+ }
+ return this.peerConfig;
+ }
+
+ @Override
+ public PeerState getPeerState() {
+ return peerState;
+ }
+
+ /**
+ * Get the identifier of this peer
+ * @return string representation of the id (short)
+ */
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * Get the peer config object
+ * @return the ReplicationPeerConfig for this peer
+ */
+ @Override
+ public ReplicationPeerConfig getPeerConfig() {
+ return peerConfig;
+ }
+
+ /**
+ * Get the configuration object required to communicate with this peer
+ * @return configuration object
+ */
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ /**
+ * Get replicable (table, cf-list) map of this peer
+ * @return the replicable (table, cf-list) map
+ */
+ @Override
+ public Map<TableName, List<String>> getTableCFs() {
+ this.tableCFs = peerConfig.getTableCFsMap();
+ return this.tableCFs;
+ }
+
+ /**
+ * Get replicable namespace set of this peer
+ * @return the replicable namespaces set
+ */
+ @Override
+ public Set<String> getNamespaces() {
+ return this.peerConfig.getNamespaces();
+ }
+
+ @Override
+ public long getPeerBandwidth() {
+ return this.peerConfig.getBandwidth();
+ }
+
+ @Override
+ public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
+ if (this.peerConfigTracker != null){
+ this.peerConfigTracker.setListener(listener);
+ }
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.fatal("The ReplicationPeer corresponding to peer " + peerConfig
+ + " was aborted for the following reason(s):" + why, e);
+ }
+
+ @Override
+ public boolean isAborted() {
+ // Currently the replication peer is never "Aborted", we just log when the
+ // abort method is called.
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // TODO: stop zkw?
+ }
+
+ /**
+ * Parse the raw data from ZK to get a peer's state
+ * @param bytes raw ZK data
+ * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
+ * @throws DeserializationException
+ */
+ public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
+ ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes);
+ return ReplicationProtos.ReplicationState.State.ENABLED == state;
+ }
+
+ /**
+ * @param bytes Content of a state znode.
+ * @return State parsed from the passed bytes.
+ * @throws DeserializationException
+ */
+ private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
+ throws DeserializationException {
+ ProtobufUtil.expectPBMagicPrefix(bytes);
+ int pblen = ProtobufUtil.lengthOfPBMagic();
+ ReplicationProtos.ReplicationState.Builder builder =
+ ReplicationProtos.ReplicationState.newBuilder();
+ ReplicationProtos.ReplicationState state;
+ try {
+ ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
+ state = builder.build();
+ return state.getState();
+ } catch (IOException e) {
+ throw new DeserializationException(e);
+ }
+ }
+
+ /**
+ * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
+ * @param path Path to znode to check
+ * @return True if we created the znode.
+ * @throws NodeExistsException
+ * @throws KeeperException
+ */
+ private boolean ensurePeerEnabled(final String path)
+ throws NodeExistsException, KeeperException {
+ if (ZKUtil.checkExists(zookeeper, path) == -1) {
+ // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
+ // peer-state znode. This happens while adding a peer.
+ // The peer state data is set as "ENABLED" by default.
+ ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
+ ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Tracker for state of this peer
+ */
+ public class PeerStateTracker extends ZooKeeperNodeTracker {
+
+ public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
+ Abortable abortable) {
+ super(watcher, peerStateZNode, abortable);
+ }
+
+ @Override
+ public synchronized void nodeDataChanged(String path) {
+ if (path.equals(node)) {
+ super.nodeDataChanged(path);
+ try {
+ readPeerStateZnode();
+ } catch (DeserializationException e) {
+ LOG.warn("Failed deserializing the content of " + path, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Tracker for PeerConfigNode of this peer
+ */
+ public class PeerConfigTracker extends ZooKeeperNodeTracker {
+
+ ReplicationPeerConfigListener listener;
+
+ public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher,
+ Abortable abortable) {
+ super(watcher, peerConfigNode, abortable);
+ }
+
+ public synchronized void setListener(ReplicationPeerConfigListener listener){
+ this.listener = listener;
+ }
+
+ @Override
+ public synchronized void nodeCreated(String path) {
+ if (path.equals(node)) {
+ super.nodeCreated(path);
+ ReplicationPeerConfig config = readPeerConfig();
+ if (listener != null){
+ listener.peerConfigUpdated(config);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void nodeDataChanged(String path) {
+ //superclass calls nodeCreated
+ if (path.equals(node)) {
+ super.nodeDataChanged(path);
+ }
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
new file mode 100644
index 0000000..2a7963a
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -0,0 +1,177 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * This provides an interface for maintaining a set of peer clusters. These peers are remote slave
+ * clusters that data is replicated to. A peer cluster can be in three different states:
+ *
+ * 1. Not-Registered - There is no notion of the peer cluster.
+ * 2. Registered - The peer has an id and is being tracked but there is no connection.
+ * 3. Connected - There is an active connection to the remote peer.
+ *
+ * In the registered or connected state, a peer cluster can either be enabled or disabled.
+ */
+@InterfaceAudience.Private
+public interface ReplicationPeers {
+
+ /**
+ * Initialize the ReplicationPeers interface.
+ */
+ void init() throws ReplicationException;
+
+ /**
+ * Add a new remote slave cluster for replication.
+ * @param peerId a short that identifies the cluster
+ * @param peerConfig configuration for the replication slave cluster
+ */
+ void registerPeer(String peerId, ReplicationPeerConfig peerConfig)
+ throws ReplicationException;
+
+ /**
+ * Removes a remote slave cluster and stops the replication to it.
+ * @param peerId a short that identifies the cluster
+ */
+ void unregisterPeer(String peerId) throws ReplicationException;
+
+ /**
+ * Method called after a peer has been connected. It will create a ReplicationPeer to track the
+ * newly connected cluster.
+ * @param peerId a short that identifies the cluster
+ * @return whether a ReplicationPeer was successfully created
+ * @throws ReplicationException
+ */
+ boolean peerConnected(String peerId) throws ReplicationException;
+
+ /**
+ * Method called after a peer has been disconnected. It will remove the ReplicationPeer that
+ * tracked the disconnected cluster.
+ * @param peerId a short that identifies the cluster
+ */
+ void peerDisconnected(String peerId);
+
+ /**
+ * Restart the replication to the specified remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ */
+ void enablePeer(String peerId) throws ReplicationException;
+
+ /**
+ * Stop the replication to the specified remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ */
+ void disablePeer(String peerId) throws ReplicationException;
+
+ /**
+ * Get the table and column-family list string of the peer from the underlying storage.
+ * @param peerId a short that identifies the cluster
+ */
+ public Map<TableName, List<String>> getPeerTableCFsConfig(String peerId)
+ throws ReplicationException;
+
+ /**
+ * Set the table and column-family list string of the peer to the underlying storage.
+ * @param peerId a short that identifies the cluster
+ * @param tableCFs the table and column-family list which will be replicated for this peer
+ */
+ public void setPeerTableCFsConfig(String peerId,
+ Map<TableName, ? extends Collection<String>> tableCFs)
+ throws ReplicationException;
+
+ /**
+ * Returns the ReplicationPeer for the specified connected peer. This ReplicationPeer will
+ * continue to track changes to the Peer's state and config. This method returns null if no
+ * peer has been connected with the given peerId.
+ * @param peerId id for the peer
+ * @return ReplicationPeer object
+ */
+ ReplicationPeer getConnectedPeer(String peerId);
+
+ /**
+ * Returns the set of peerIds of the clusters that have been connected and have an underlying
+ * ReplicationPeer.
+ * @return a Set of Strings for peerIds
+ */
+ public Set<String> getConnectedPeerIds();
+
+ /**
+ * Get the replication status for the specified connected remote slave cluster.
+ * The value might be read from cache, so it is recommended to
+ * use {@link #getStatusOfPeerFromBackingStore(String)}
+ * if reading the state after enabling or disabling it.
+ * @param peerId a short that identifies the cluster
+ * @return true if replication is enabled, false otherwise.
+ */
+ boolean getStatusOfPeer(String peerId);
+
+ /**
+ * Get the replication status for the specified remote slave cluster, which doesn't
+ * have to be connected. The state is read directly from the backing store.
+ * @param peerId a short that identifies the cluster
+ * @return true if replication is enabled, false otherwise.
+ * @throws ReplicationException thrown if there's an error contacting the store
+ */
+ boolean getStatusOfPeerFromBackingStore(String peerId) throws ReplicationException;
+
+ /**
+ * List the cluster replication configs of all remote slave clusters (whether they are
+ * enabled/disabled or connected/disconnected).
+ * @return A map of peer ids to peer cluster keys
+ */
+ Map<String, ReplicationPeerConfig> getAllPeerConfigs();
+
+ /**
+ * List the peer ids of all remote slave clusters (whether they are enabled/disabled or
+ * connected/disconnected).
+ * @return A list of peer ids
+ */
+ List<String> getAllPeerIds();
+
+ /**
+ * Returns the configured ReplicationPeerConfig for this peerId
+ * @param peerId a short name that identifies the cluster
+ * @return ReplicationPeerConfig for the peer
+ */
+ ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException;
+
+ /**
+ * Returns the configuration needed to talk to the remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ * @return the configuration for the peer cluster, null if it was unable to get the configuration
+ */
+ Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException;
+
+ /**
+ * Update the peerConfig for the a given peer cluster
+ * @param id a short that identifies the cluster
+ * @param peerConfig new config for the peer cluster
+ * @throws ReplicationException
+ */
+ void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
new file mode 100644
index 0000000..751e454
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -0,0 +1,546 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.CompoundConfiguration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This class provides an implementation of the ReplicationPeers interface using ZooKeeper. The
+ * peers znode contains a list of all peer replication clusters and the current replication state of
+ * those clusters. It has one child peer znode for each peer cluster. The peer znode is named with
+ * the cluster id provided by the user in the HBase shell. The value of the peer znode contains the
+ * peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of
+ * zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase.
+ * For example:
+ *
+ * /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
+ * /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
+ *
+ * Each of these peer znodes has a child znode that indicates whether or not replication is enabled
+ * on that peer cluster. These peer-state znodes do not have child znodes and simply contain a
+ * boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the
+ * ReplicationPeer.PeerStateTracker class. For example:
+ *
+ * /hbase/replication/peers/1/peer-state [Value: ENABLED]
+ *
+ * Each of these peer znodes has a child znode that indicates which data will be replicated
+ * to the peer cluster. These peer-tableCFs znodes do not have child znodes and only have a
+ * table/cf list config. This value is read/maintained by the ReplicationPeer.TableCFsTracker
+ * class. For example:
+ *
+ * /hbase/replication/peers/1/tableCFs [Value: "table1; table2:cf1,cf3; table3:cfx,cfy"]
+ */
+@InterfaceAudience.Private
+public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
+
+ // Map of peer clusters keyed by their id
+ private Map<String, ReplicationPeerZKImpl> peerClusters;
+ private final ReplicationQueuesClient queuesClient;
+ private Abortable abortable;
+
+ private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
+
+ public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
+ final ReplicationQueuesClient queuesClient, Abortable abortable) {
+ super(zk, conf, abortable);
+ this.abortable = abortable;
+ this.peerClusters = new ConcurrentHashMap<>();
+ this.queuesClient = queuesClient;
+ }
+
+ @Override
+ public void init() throws ReplicationException {
+ try {
+ if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
+ ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+ }
+ } catch (KeeperException e) {
+ throw new ReplicationException("Could not initialize replication peers", e);
+ }
+ addExistingPeers();
+ }
+
+ @Override
+ public void registerPeer(String id, ReplicationPeerConfig peerConfig)
+ throws ReplicationException {
+ try {
+ if (peerExists(id)) {
+ throw new IllegalArgumentException("Cannot add a peer with id=" + id
+ + " because that id already exists.");
+ }
+
+ if(id.contains("-")){
+ throw new IllegalArgumentException("Found invalid peer name:" + id);
+ }
+
+ if (peerConfig.getClusterKey() != null) {
+ try {
+ ZKConfig.validateClusterKey(peerConfig.getClusterKey());
+ } catch (IOException ioe) {
+ throw new IllegalArgumentException(ioe.getMessage());
+ }
+ }
+
+ checkQueuesDeleted(id);
+
+ ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+
+ List<ZKUtilOp> listOfOps = new ArrayList<>(2);
+ ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id),
+ ReplicationSerDeHelper.toByteArray(peerConfig));
+ // b/w PeerWatcher and ReplicationZookeeper#add method to create the
+ // peer-state znode. This happens while adding a peer
+ // The peer state data is set as "ENABLED" by default.
+ ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES);
+ listOfOps.add(op1);
+ listOfOps.add(op2);
+ ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
+ // A peer is enabled by default
+ } catch (KeeperException e) {
+ throw new ReplicationException("Could not add peer with id=" + id
+ + ", peerConfif=>" + peerConfig, e);
+ }
+ }
+
+ @Override
+ public void unregisterPeer(String id) throws ReplicationException {
+ try {
+ if (!peerExists(id)) {
+ throw new IllegalArgumentException("Cannot remove peer with id=" + id
+ + " because that id does not exist.");
+ }
+ ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+ } catch (KeeperException e) {
+ throw new ReplicationException("Could not remove peer with id=" + id, e);
+ }
+ }
+
+ @Override
+ public void enablePeer(String id) throws ReplicationException {
+ changePeerState(id, ReplicationProtos.ReplicationState.State.ENABLED);
+ LOG.info("peer " + id + " is enabled");
+ }
+
+ @Override
+ public void disablePeer(String id) throws ReplicationException {
+ changePeerState(id, ReplicationProtos.ReplicationState.State.DISABLED);
+ LOG.info("peer " + id + " is disabled");
+ }
+
+ @Override
+ public Map<TableName, List<String>> getPeerTableCFsConfig(String id) throws ReplicationException {
+ try {
+ if (!peerExists(id)) {
+ throw new IllegalArgumentException("peer " + id + " doesn't exist");
+ }
+ try {
+ ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
+ if (rpc == null) {
+ throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
+ }
+ return rpc.getTableCFsMap();
+ } catch (Exception e) {
+ throw new ReplicationException(e);
+ }
+ } catch (KeeperException e) {
+ throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
+ }
+ }
+
+ @Override
+ public void setPeerTableCFsConfig(String id,
+ Map<TableName, ? extends Collection<String>> tableCFs)
+ throws ReplicationException {
+ try {
+ if (!peerExists(id)) {
+ throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
+ + " does not exist.");
+ }
+ ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
+ if (rpc == null) {
+ throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
+ }
+ rpc.setTableCFsMap(tableCFs);
+ ZKUtil.setData(this.zookeeper, getPeerNode(id),
+ ReplicationSerDeHelper.toByteArray(rpc));
+ LOG.info("Peer tableCFs with id= " + id + " is now " +
+ ReplicationSerDeHelper.convertToString(tableCFs));
+ } catch (KeeperException e) {
+ throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
+ }
+ }
+
+ @Override
+ public boolean getStatusOfPeer(String id) {
+ ReplicationPeer replicationPeer = this.peerClusters.get(id);
+ if (replicationPeer == null) {
+ throw new IllegalArgumentException("Peer with id= " + id + " is not cached");
+ }
+ return replicationPeer.getPeerState() == PeerState.ENABLED;
+ }
+
+ @Override
+ public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
+ try {
+ if (!peerExists(id)) {
+ throw new IllegalArgumentException("peer " + id + " doesn't exist");
+ }
+ String peerStateZNode = getPeerStateNode(id);
+ try {
+ return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
+ } catch (KeeperException e) {
+ throw new ReplicationException(e);
+ } catch (DeserializationException e) {
+ throw new ReplicationException(e);
+ }
+ } catch (KeeperException e) {
+ throw new ReplicationException("Unable to get status of the peer with id=" + id +
+ " from backing store", e);
+ } catch (InterruptedException e) {
+ throw new ReplicationException(e);
+ }
+ }
+
+ @Override
+ public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
+ Map<String, ReplicationPeerConfig> peers = new TreeMap<>();
+ List<String> ids = null;
+ try {
+ ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+ for (String id : ids) {
+ ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
+ if (peerConfig == null) {
+ LOG.warn("Failed to get replication peer configuration of clusterid=" + id
+ + " znode content, continuing.");
+ continue;
+ }
+ peers.put(id, peerConfig);
+ }
+ } catch (KeeperException e) {
+ this.abortable.abort("Cannot get the list of peers ", e);
+ } catch (ReplicationException e) {
+ this.abortable.abort("Cannot get the list of peers ", e);
+ }
+ return peers;
+ }
+
+ @Override
+ public ReplicationPeer getConnectedPeer(String peerId) {
+ return peerClusters.get(peerId);
+ }
+
+ @Override
+ public Set<String> getConnectedPeerIds() {
+ return peerClusters.keySet(); // this is not thread-safe
+ }
+
+ /**
+ * Returns a ReplicationPeerConfig from the znode or null for the given peerId.
+ */
+ @Override
+ public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
+ throws ReplicationException {
+ String znode = getPeerNode(peerId);
+ byte[] data = null;
+ try {
+ data = ZKUtil.getData(this.zookeeper, znode);
+ } catch (InterruptedException e) {
+ LOG.warn("Could not get configuration for peer because the thread " +
+ "was interrupted. peerId=" + peerId);
+ Thread.currentThread().interrupt();
+ return null;
+ } catch (KeeperException e) {
+ throw new ReplicationException("Error getting configuration for peer with id="
+ + peerId, e);
+ }
+ if (data == null) {
+ LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
+ return null;
+ }
+
+ try {
+ return ReplicationSerDeHelper.parsePeerFrom(data);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed to parse cluster key from peerId=" + peerId
+ + ", specifically the content from the following znode: " + znode);
+ return null;
+ }
+ }
+
+ @Override
+ public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
+ throws ReplicationException {
+ ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
+
+ if (peerConfig == null) {
+ return null;
+ }
+
+ Configuration otherConf;
+ try {
+ otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
+ } catch (IOException e) {
+ LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
+ return null;
+ }
+
+ if (!peerConfig.getConfiguration().isEmpty()) {
+ CompoundConfiguration compound = new CompoundConfiguration();
+ compound.add(otherConf);
+ compound.addStringMap(peerConfig.getConfiguration());
+ return new Pair<>(peerConfig, compound);
+ }
+
+ return new Pair<>(peerConfig, otherConf);
+ }
+
+ @Override
+ public void updatePeerConfig(String id, ReplicationPeerConfig newConfig)
+ throws ReplicationException {
+ ReplicationPeer peer = getConnectedPeer(id);
+ if (peer == null){
+ throw new ReplicationException("Could not find peer Id " + id + " in connected peers");
+ }
+ ReplicationPeerConfig existingConfig = peer.getPeerConfig();
+ if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() &&
+ !newConfig.getClusterKey().equals(existingConfig.getClusterKey())){
+ throw new ReplicationException("Changing the cluster key on an existing peer is not allowed."
+ + " Existing key '" + existingConfig.getClusterKey() + "' does not match new key '"
+ + newConfig.getClusterKey() +
+ "'");
+ }
+ String existingEndpointImpl = existingConfig.getReplicationEndpointImpl();
+ if (newConfig.getReplicationEndpointImpl() != null &&
+ !newConfig.getReplicationEndpointImpl().isEmpty() &&
+ !newConfig.getReplicationEndpointImpl().equals(existingEndpointImpl)){
+ throw new ReplicationException("Changing the replication endpoint implementation class " +
+ "on an existing peer is not allowed. Existing class '"
+ + existingConfig.getReplicationEndpointImpl()
+ + "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'");
+ }
+ //Update existingConfig's peer config and peer data with the new values, but don't touch config
+ // or data that weren't explicitly changed
+ existingConfig.getConfiguration().putAll(newConfig.getConfiguration());
+ existingConfig.getPeerData().putAll(newConfig.getPeerData());
+ existingConfig.setTableCFsMap(newConfig.getTableCFsMap());
+ existingConfig.setNamespaces(newConfig.getNamespaces());
+ existingConfig.setBandwidth(newConfig.getBandwidth());
+
+ try {
+ ZKUtil.setData(this.zookeeper, getPeerNode(id),
+ ReplicationSerDeHelper.toByteArray(existingConfig));
+ }
+ catch(KeeperException ke){
+ throw new ReplicationException("There was a problem trying to save changes to the " +
+ "replication peer " + id, ke);
+ }
+ }
+
+ /**
+ * List all registered peer clusters and set a watch on their znodes.
+ */
+ @Override
+ public List<String> getAllPeerIds() {
+ List<String> ids = null;
+ try {
+ ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Cannot get the list of peers ", e);
+ }
+ return ids;
+ }
+
+ /**
+ * A private method used during initialization. This method attempts to add all registered
+ * peer clusters. This method does not set a watch on the peer cluster znodes.
+ */
+ private void addExistingPeers() throws ReplicationException {
+ List<String> znodes = null;
+ try {
+ znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+ } catch (KeeperException e) {
+ throw new ReplicationException("Error getting the list of peer clusters.", e);
+ }
+ if (znodes != null) {
+ for (String z : znodes) {
+ createAndAddPeer(z);
+ }
+ }
+ }
+
+ @Override
+ public boolean peerConnected(String peerId) throws ReplicationException {
+ return createAndAddPeer(peerId);
+ }
+
+ @Override
+ public void peerDisconnected(String peerId) {
+ ReplicationPeer rp = this.peerClusters.get(peerId);
+ if (rp != null) {
+ ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp);
+ }
+ }
+
+ /**
+ * Attempt to connect to a new remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ * @return true if a new connection was made, false if no new connection was made.
+ */
+ public boolean createAndAddPeer(String peerId) throws ReplicationException {
+ if (peerClusters == null) {
+ return false;
+ }
+ if (this.peerClusters.containsKey(peerId)) {
+ return false;
+ }
+
+ ReplicationPeerZKImpl peer = null;
+ try {
+ peer = createPeer(peerId);
+ } catch (Exception e) {
+ throw new ReplicationException("Error adding peer with id=" + peerId, e);
+ }
+ if (peer == null) {
+ return false;
+ }
+ ReplicationPeerZKImpl previous =
+ ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer);
+ if (previous == null) {
+ LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey());
+ } else {
+ LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() +
+ ", new cluster=" + peer.getPeerConfig().getClusterKey());
+ }
+ return true;
+ }
+
+ /**
+ * Update the state znode of a peer cluster.
+ * @param id
+ * @param state
+ */
+ private void changePeerState(String id, ReplicationProtos.ReplicationState.State state)
+ throws ReplicationException {
+ try {
+ if (!peerExists(id)) {
+ throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
+ + " does not exist.");
+ }
+ String peerStateZNode = getPeerStateNode(id);
+ byte[] stateBytes =
+ (state == ReplicationProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
+ : DISABLED_ZNODE_BYTES;
+ if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
+ ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
+ } else {
+ ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
+ }
+ LOG.info("Peer with id= " + id + " is now " + state.name());
+ } catch (KeeperException e) {
+ throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
+ }
+ }
+
+ /**
+ * Helper method to connect to a peer
+ * @param peerId peer's identifier
+ * @return object representing the peer
+ * @throws ReplicationException
+ */
+ private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
+ Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
+ if (pair == null) {
+ return null;
+ }
+ Configuration peerConf = pair.getSecond();
+
+ ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(zookeeper,
+ peerConf, peerId, pair.getFirst(), abortable);
+ try {
+ peer.startStateTracker(this.getPeerStateNode(peerId));
+ } catch (KeeperException e) {
+ throw new ReplicationException("Error starting the peer state tracker for peerId=" +
+ peerId, e);
+ }
+
+ try {
+ peer.startPeerConfigTracker(this.getPeerNode(peerId));
+ } catch (KeeperException e) {
+ throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" +
+ peerId, e);
+ }
+
+ return peer;
+ }
+
+ private void checkQueuesDeleted(String peerId) throws ReplicationException {
+ if (queuesClient == null) return;
+ try {
+ List<String> replicators = queuesClient.getListOfReplicators();
+ if (replicators == null || replicators.isEmpty()) {
+ return;
+ }
+ for (String replicator : replicators) {
+ List<String> queueIds = queuesClient.getAllQueues(replicator);
+ for (String queueId : queueIds) {
+ ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+ if (queueInfo.getPeerId().equals(peerId)) {
+ throw new ReplicationException("undeleted queue for peerId: " + peerId
+ + ", replicator: " + replicator + ", queueId: " + queueId);
+ }
+ }
+ }
+ // Check for hfile-refs queue
+ if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
+ && queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) {
+ throw new ReplicationException("Undeleted queue for peerId: " + peerId
+ + ", found in hfile-refs node path " + hfileRefsZNode);
+ }
+ } catch (KeeperException e) {
+ throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
new file mode 100644
index 0000000..1403f6d
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ServerName;
+
+/**
+ * This class is responsible for the parsing logic for a znode representing a queue.
+ * It will extract the peerId if it's recovered as well as the dead region servers
+ * that were part of the queue's history.
+ */
+@InterfaceAudience.Private
+public class ReplicationQueueInfo {
+ private static final Log LOG = LogFactory.getLog(ReplicationQueueInfo.class);
+
+ private final String peerId;
+ private final String peerClusterZnode;
+ private boolean queueRecovered;
+ // List of all the dead region servers that had this queue (if recovered)
+ private List<String> deadRegionServers = new ArrayList<>();
+
+ /**
+ * The passed znode will be either the id of the peer cluster or
+ * the handling story of that queue in the form of id-servername-*
+ */
+ public ReplicationQueueInfo(String znode) {
+ this.peerClusterZnode = znode;
+ String[] parts = znode.split("-", 2);
+ this.queueRecovered = parts.length != 1;
+ this.peerId = this.queueRecovered ?
+ parts[0] : peerClusterZnode;
+ if (parts.length >= 2) {
+ // extract dead servers
+ extractDeadServersFromZNodeString(parts[1], this.deadRegionServers);
+ }
+ }
+
+ /**
+ * Parse dead server names from znode string servername can contain "-" such as
+ * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following
+ * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server name>-...
+ */
+ private static void
+ extractDeadServersFromZNodeString(String deadServerListStr, List<String> result) {
+
+ if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return;
+
+ // valid server name delimiter "-" has to be after "," in a server name
+ int seenCommaCnt = 0;
+ int startIndex = 0;
+ int len = deadServerListStr.length();
+
+ for (int i = 0; i < len; i++) {
+ switch (deadServerListStr.charAt(i)) {
+ case ',':
+ seenCommaCnt += 1;
+ break;
+ case '-':
+ if(seenCommaCnt>=2) {
+ if (i > startIndex) {
+ String serverName = deadServerListStr.substring(startIndex, i);
+ if(ServerName.isFullServerName(serverName)){
+ result.add(serverName);
+ } else {
+ LOG.error("Found invalid server name:" + serverName);
+ }
+ startIndex = i + 1;
+ }
+ seenCommaCnt = 0;
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ // add tail
+ if(startIndex < len - 1){
+ String serverName = deadServerListStr.substring(startIndex, len);
+ if(ServerName.isFullServerName(serverName)){
+ result.add(serverName);
+ } else {
+ LOG.error("Found invalid server name at the end:" + serverName);
+ }
+ }
+
+ LOG.debug("Found dead servers:" + result);
+ }
+
+ public List<String> getDeadRegionServers() {
+ return Collections.unmodifiableList(this.deadRegionServers);
+ }
+
+ public String getPeerId() {
+ return this.peerId;
+ }
+
+ public String getPeerClusterZnode() {
+ return this.peerClusterZnode;
+ }
+
+ public boolean isQueueRecovered() {
+ return queueRecovered;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
new file mode 100644
index 0000000..be5a590
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+import java.util.SortedSet;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * This provides an interface for maintaining a region server's replication queues. These queues
+ * keep track of the WALs and HFile references (if hbase.replication.bulkload.enabled is enabled)
+ * that still need to be replicated to remote clusters.
+ */
+@InterfaceAudience.Private
+public interface ReplicationQueues {
+
+ /**
+ * Initialize the region server replication queue interface.
+ * @param serverName The server name of the region server that owns the replication queues this
+ * interface manages.
+ */
+ void init(String serverName) throws ReplicationException;
+
+ /**
+ * Remove a replication queue.
+ * @param queueId a String that identifies the queue.
+ */
+ void removeQueue(String queueId);
+
+ /**
+ * Add a new WAL file to the given queue. If the queue does not exist it is created.
+ * @param queueId a String that identifies the queue.
+ * @param filename name of the WAL
+ */
+ void addLog(String queueId, String filename) throws ReplicationException;
+
+ /**
+ * Remove an WAL file from the given queue.
+ * @param queueId a String that identifies the queue.
+ * @param filename name of the WAL
+ */
+ void removeLog(String queueId, String filename);
+
+ /**
+ * Set the current position for a specific WAL in a given queue.
+ * @param queueId a String that identifies the queue
+ * @param filename name of the WAL
+ * @param position the current position in the file
+ */
+ void setLogPosition(String queueId, String filename, long position);
+
+ /**
+ * Get the current position for a specific WAL in a given queue.
+ * @param queueId a String that identifies the queue
+ * @param filename name of the WAL
+ * @return the current position in the file
+ */
+ long getLogPosition(String queueId, String filename) throws ReplicationException;
+
+ /**
+ * Remove all replication queues for this region server.
+ */
+ void removeAllQueues();
+
+ /**
+ * Get a list of all WALs in the given queue.
+ * @param queueId a String that identifies the queue
+ * @return a list of WALs, null if no such queue exists for this server
+ */
+ List<String> getLogsInQueue(String queueId);
+
+ /**
+ * Get a list of all queues for this region server.
+ * @return a list of queueIds, an empty list if this region server is dead and has no outstanding queues
+ */
+ List<String> getAllQueues();
+
+ /**
+ * Get queueIds from a dead region server, whose queues has not been claimed by other region
+ * servers.
+ * @return empty if the queue exists but no children, null if the queue does not exist.
+ */
+ List<String> getUnClaimedQueueIds(String regionserver);
+
+ /**
+ * Take ownership for the queue identified by queueId and belongs to a dead region server.
+ * @param regionserver the id of the dead region server
+ * @param queueId the id of the queue
+ * @return the new PeerId and A SortedSet of WALs in its queue, and null if no unclaimed queue.
+ */
+ Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId);
+
+ /**
+ * Remove the znode of region server if the queue is empty.
+ * @param regionserver
+ */
+ void removeReplicatorIfQueueIsEmpty(String regionserver);
+
+ /**
+ * Get a list of all region servers that have outstanding replication queues. These servers could
+ * be alive, dead or from a previous run of the cluster.
+ * @return a list of server names
+ */
+ List<String> getListOfReplicators();
+
+ /**
+ * Checks if the provided znode is the same as this region server's
+ * @param regionserver the id of the region server
+ * @return if this is this rs's znode
+ */
+ boolean isThisOurRegionServer(String regionserver);
+
+ /**
+ * Add a peer to hfile reference queue if peer does not exist.
+ * @param peerId peer cluster id to be added
+ * @throws ReplicationException if fails to add a peer id to hfile reference queue
+ */
+ void addPeerToHFileRefs(String peerId) throws ReplicationException;
+
+ /**
+ * Remove a peer from hfile reference queue.
+ * @param peerId peer cluster id to be removed
+ */
+ void removePeerFromHFileRefs(String peerId);
+
+ /**
+ * Add new hfile references to the queue.
+ * @param peerId peer cluster id to which the hfiles need to be replicated
+ * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
+ * will be added in the queue }
+ * @throws ReplicationException if fails to add a hfile reference
+ */
+ void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException;
+
+ /**
+ * Remove hfile references from the queue.
+ * @param peerId peer cluster id from which this hfile references needs to be removed
+ * @param files list of hfile references to be removed
+ */
+ void removeHFileRefs(String peerId, List<String> files);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
new file mode 100644
index 0000000..12fc6a1
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+/**
+ * Wrapper around common arguments used to construct ReplicationQueues. Used to construct various
+ * ReplicationQueues Implementations with different constructor arguments by reflection.
+ */
+@InterfaceAudience.Private
+public class ReplicationQueuesArguments {
+
+ private ZooKeeperWatcher zk;
+ private Configuration conf;
+ private Abortable abort;
+
+ public ReplicationQueuesArguments(Configuration conf, Abortable abort) {
+ this.conf = conf;
+ this.abort = abort;
+ }
+
+ public ReplicationQueuesArguments(Configuration conf, Abortable abort, ZooKeeperWatcher zk) {
+ this(conf, abort);
+ setZk(zk);
+ }
+
+ public ZooKeeperWatcher getZk() {
+ return zk;
+ }
+
+ public void setZk(ZooKeeperWatcher zk) {
+ this.zk = zk;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public Abortable getAbortable() {
+ return abort;
+ }
+
+ public void setAbortable(Abortable abort) {
+ this.abort = abort;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
new file mode 100644
index 0000000..6d8900e
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
@@ -0,0 +1,93 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This provides an interface for clients of replication to view replication queues. These queues
+ * keep track of the sources(WALs/HFile references) that still need to be replicated to remote
+ * clusters.
+ */
+@InterfaceAudience.Private
+public interface ReplicationQueuesClient {
+
+ /**
+ * Initialize the replication queue client interface.
+ */
+ public void init() throws ReplicationException;
+
+ /**
+ * Get a list of all region servers that have outstanding replication queues. These servers could
+ * be alive, dead or from a previous run of the cluster.
+ * @return a list of server names
+ * @throws KeeperException zookeeper exception
+ */
+ List<String> getListOfReplicators() throws KeeperException;
+
+ /**
+ * Get a list of all WALs in the given queue on the given region server.
+ * @param serverName the server name of the region server that owns the queue
+ * @param queueId a String that identifies the queue
+ * @return a list of WALs, null if this region server is dead and has no outstanding queues
+ * @throws KeeperException zookeeper exception
+ */
+ List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException;
+
+ /**
+ * Get a list of all queues for the specified region server.
+ * @param serverName the server name of the region server that owns the set of queues
+ * @return a list of queueIds, null if this region server is not a replicator.
+ */
+ List<String> getAllQueues(String serverName) throws KeeperException;
+
+ /**
+ * Load all wals in all replication queues from ZK. This method guarantees to return a
+ * snapshot which contains all WALs in the zookeeper at the start of this call even there
+ * is concurrent queue failover. However, some newly created WALs during the call may
+ * not be included.
+ */
+ Set<String> getAllWALs() throws KeeperException;
+
+ /**
+ * Get the change version number of replication hfile references node. This can be used as
+ * optimistic locking to get a consistent snapshot of the replication queues of hfile references.
+ * @return change version number of hfile references node
+ */
+ int getHFileRefsNodeChangeVersion() throws KeeperException;
+
+ /**
+ * Get list of all peers from hfile reference queue.
+ * @return a list of peer ids
+ * @throws KeeperException zookeeper exception
+ */
+ List<String> getAllPeersFromHFileRefsQueue() throws KeeperException;
+
+ /**
+ * Get a list of all hfile references in the given peer.
+ * @param peerId a String that identifies the peer
+ * @return a list of hfile references, null if not found any
+ * @throws KeeperException zookeeper exception
+ */
+ List<String> getReplicableHFiles(String peerId) throws KeeperException;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
new file mode 100644
index 0000000..834f831
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+/**
+ * Wrapper around common arguments used to construct ReplicationQueuesClient. Used to construct
+ * various ReplicationQueuesClient Implementations with different constructor arguments by
+ * reflection.
+ */
+@InterfaceAudience.Private
+public class ReplicationQueuesClientArguments extends ReplicationQueuesArguments {
+ public ReplicationQueuesClientArguments(Configuration conf, Abortable abort,
+ ZooKeeperWatcher zk) {
+ super(conf, abort, zk);
+ }
+ public ReplicationQueuesClientArguments(Configuration conf, Abortable abort) {
+ super(conf, abort);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
new file mode 100644
index 0000000..1981131
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -0,0 +1,175 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+@InterfaceAudience.Private
+public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements
+ ReplicationQueuesClient {
+
+ Log LOG = LogFactory.getLog(ReplicationQueuesClientZKImpl.class);
+
+ public ReplicationQueuesClientZKImpl(ReplicationQueuesClientArguments args) {
+ this(args.getZk(), args.getConf(), args.getAbortable());
+ }
+
+ public ReplicationQueuesClientZKImpl(final ZooKeeperWatcher zk, Configuration conf,
+ Abortable abortable) {
+ super(zk, conf, abortable);
+ }
+
+ @Override
+ public void init() throws ReplicationException {
+ try {
+ if (ZKUtil.checkExists(this.zookeeper, this.queuesZNode) < 0) {
+ ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
+ }
+ } catch (KeeperException e) {
+ throw new ReplicationException("Internal error while initializing a queues client", e);
+ }
+ }
+
+ @Override
+ public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException {
+ String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
+ znode = ZKUtil.joinZNode(znode, queueId);
+ List<String> result = null;
+ try {
+ result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed to get list of wals for queueId=" + queueId
+ + " and serverName=" + serverName, e);
+ throw e;
+ }
+ return result;
+ }
+
+ @Override
+ public List<String> getAllQueues(String serverName) throws KeeperException {
+ String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
+ List<String> result = null;
+ try {
+ result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e);
+ throw e;
+ }
+ return result;
+ }
+
+ @Override
+ public Set<String> getAllWALs() throws KeeperException {
+ /**
+ * Load all wals in all replication queues from ZK. This method guarantees to return a
+ * snapshot which contains all WALs in the zookeeper at the start of this call even there
+ * is concurrent queue failover. However, some newly created WALs during the call may
+ * not be included.
+ */
+ for (int retry = 0; ; retry++) {
+ int v0 = getQueuesZNodeCversion();
+ List<String> rss = getListOfReplicators();
+ if (rss == null || rss.isEmpty()) {
+ LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
+ return ImmutableSet.of();
+ }
+ Set<String> wals = Sets.newHashSet();
+ for (String rs : rss) {
+ List<String> listOfPeers = getAllQueues(rs);
+ // if rs just died, this will be null
+ if (listOfPeers == null) {
+ continue;
+ }
+ for (String id : listOfPeers) {
+ List<String> peersWals = getLogsInQueue(rs, id);
+ if (peersWals != null) {
+ wals.addAll(peersWals);
+ }
+ }
+ }
+ int v1 = getQueuesZNodeCversion();
+ if (v0 == v1) {
+ return wals;
+ }
+ LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
+ v0, v1, retry));
+ }
+ }
+
+ public int getQueuesZNodeCversion() throws KeeperException {
+ try {
+ Stat stat = new Stat();
+ ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
+ return stat.getCversion();
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed to get stat of replication rs node", e);
+ throw e;
+ }
+ }
+
+ @Override
+ public int getHFileRefsNodeChangeVersion() throws KeeperException {
+ Stat stat = new Stat();
+ try {
+ ZKUtil.getDataNoWatch(this.zookeeper, this.hfileRefsZNode, stat);
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed to get stat of replication hfile references node.", e);
+ throw e;
+ }
+ return stat.getCversion();
+ }
+
+ @Override
+ public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException {
+ List<String> result = null;
+ try {
+ result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.hfileRefsZNode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed to get list of all peers in hfile references node.", e);
+ throw e;
+ }
+ return result;
+ }
+
+ @Override
+ public List<String> getReplicableHFiles(String peerId) throws KeeperException {
+ String znode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+ List<String> result = null;
+ try {
+ result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed to get list of hfile references for peerId=" + peerId, e);
+ throw e;
+ }
+ return result;
+ }
+}