You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/07/15 01:23:51 UTC
[1/3] HBASE-11367 Pluggable replication endpoint
Repository: hbase
Updated Branches:
refs/heads/master 4824b0dea -> 463d52d8c
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
new file mode 100644
index 0000000..e4ec0bc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * A {@link ReplicationEndpoint} implementation for replicating to another HBase cluster.
+ * For the slave cluster it selects a random number of peers
+ * using a replication ratio. For example, if replication ration = 0.1
+ * and slave cluster has 100 region servers, 10 will be selected.
+ * <p/>
+ * A stream is considered down when we cannot contact a region server on the
+ * peer cluster for more than 55 seconds by default.
+ */
+@InterfaceAudience.Private
+public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
+
+ private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
+ private HConnection conn;
+
+ private Configuration conf;
+
+ // How long should we sleep for each retry
+ private long sleepForRetries;
+
+ // Maximum number of retries before taking bold actions
+ private int maxRetriesMultiplier;
+ // Socket timeouts require even bolder actions since we don't want to DDOS
+ private int socketTimeoutMultiplier;
+ //Metrics for this source
+ private MetricsSource metrics;
+ // Handles connecting to peer region servers
+ private ReplicationSinkManager replicationSinkMgr;
+ private boolean peersSelected = false;
+
+ @Override
+ public void init(Context context) throws IOException {
+ super.init(context);
+ this.conf = HBaseConfiguration.create(ctx.getConfiguration());
+ decorateConf();
+ this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
+ this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
+ maxRetriesMultiplier * maxRetriesMultiplier);
+ // TODO: This connection is replication specific or we should make it particular to
+ // replication and make replication specific settings such as compression or codec to use
+ // passing Cells.
+ this.conn = HConnectionManager.createConnection(this.conf);
+ this.sleepForRetries =
+ this.conf.getLong("replication.source.sleepforretries", 1000);
+ this.metrics = context.getMetrics();
+ // ReplicationQueueInfo parses the peerId out of the znode for us
+ this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
+ }
+
+ private void decorateConf() {
+ String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
+ if (StringUtils.isNotEmpty(replicationCodec)) {
+ this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
+ }
+ }
+
+ private void connectToPeers() {
+ getRegionServers();
+
+ int sleepMultiplier = 1;
+
+ // Connect to peer cluster first, unless we have to stop
+ while (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
+ replicationSinkMgr.chooseSinks();
+ if (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
+ if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ }
+ }
+ }
+
+ /**
+ * Do the sleeping logic
+ * @param msg Why we sleep
+ * @param sleepMultiplier by how many times the default sleeping time is augmented
+ * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code>
+ */
+ protected boolean sleepForRetries(String msg, int sleepMultiplier) {
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
+ }
+ Thread.sleep(this.sleepForRetries * sleepMultiplier);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while sleeping between retries");
+ }
+ return sleepMultiplier < maxRetriesMultiplier;
+ }
+
+ /**
+ * Do the shipping logic
+ */
+ @Override
+ public boolean replicate(ReplicateContext replicateContext) {
+ List<HLog.Entry> entries = replicateContext.getEntries();
+ int sleepMultiplier = 1;
+ while (this.isRunning()) {
+ if (!peersSelected) {
+ connectToPeers();
+ peersSelected = true;
+ }
+
+ if (!isPeerEnabled()) {
+ if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
+ SinkPeer sinkPeer = null;
+ try {
+ sinkPeer = replicationSinkMgr.getReplicationSink();
+ BlockingInterface rrs = sinkPeer.getRegionServer();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Replicating " + entries.size() +
+ " entries of total size " + replicateContext.getSize());
+ }
+ ReplicationProtbufUtil.replicateWALEntry(rrs,
+ entries.toArray(new HLog.Entry[entries.size()]));
+
+ // update metrics
+ this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
+ return true;
+
+ } catch (IOException ioe) {
+ // Didn't ship anything, but must still age the last time we did
+ this.metrics.refreshAgeOfLastShippedOp();
+ if (ioe instanceof RemoteException) {
+ ioe = ((RemoteException) ioe).unwrapRemoteException();
+ LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
+ if (ioe instanceof TableNotFoundException) {
+ if (sleepForRetries("A table is missing in the peer cluster. "
+ + "Replication cannot proceed without losing data.", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ }
+ } else {
+ if (ioe instanceof SocketTimeoutException) {
+ // This exception means we waited for more than 60s and nothing
+ // happened, the cluster is alive and calling it right away
+ // even for a test just makes things worse.
+ sleepForRetries("Encountered a SocketTimeoutException. Since the " +
+ "call to the remote cluster timed out, which is usually " +
+ "caused by a machine failure or a massive slowdown",
+ this.socketTimeoutMultiplier);
+ } else if (ioe instanceof ConnectException) {
+ LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
+ replicationSinkMgr.chooseSinks();
+ } else {
+ LOG.warn("Can't replicate because of a local or network error: ", ioe);
+ }
+ }
+
+ if (sinkPeer != null) {
+ replicationSinkMgr.reportBadSink(sinkPeer);
+ }
+ if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ }
+ }
+ return false; // in case we exited before replicating
+ }
+
+ protected boolean isPeerEnabled() {
+ return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED;
+ }
+
+ @Override
+ protected void doStop() {
+ disconnect(); //don't call super.doStop()
+ if (this.conn != null) {
+ try {
+ this.conn.close();
+ this.conn = null;
+ } catch (IOException e) {
+ LOG.warn("Failed to close the connection");
+ }
+ }
+ notifyStopped();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index b38a0c8..94dec7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -22,13 +22,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* This class is for maintaining the various replication statistics for a source and publishing them
* through the metrics interfaces.
*/
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public class MetricsSource {
public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
@@ -152,7 +153,7 @@ public class MetricsSource {
rms.incCounters(shippedKBsKey, sizeInKB);
rms.incCounters(SOURCE_SHIPPED_KBS, sizeInKB);
}
-
+
/** increase the byte number read by source from log file */
public void incrLogReadInBytes(long readInBytes) {
rms.incCounters(logReadInBytesKey, readInBytes);
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
index 839db9b..2104268 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
@@ -29,8 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
-
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -61,7 +60,7 @@ public class ReplicationSinkManager {
private final String peerClusterId;
- private final ReplicationPeers replicationPeers;
+ private final HBaseReplicationEndpoint endpoint;
// Count of "bad replication sink" reports per peer sink
private final Map<ServerName, Integer> badReportCounts;
@@ -85,15 +84,15 @@ public class ReplicationSinkManager {
* Instantiate for a single replication peer cluster.
* @param conn connection to the peer cluster
* @param peerClusterId identifier of the peer cluster
- * @param replicationPeers manages peer clusters being replicated to
+ * @param endpoint replication endpoint for inter cluster replication
* @param conf HBase configuration, used for determining replication source ratio and bad peer
* threshold
*/
public ReplicationSinkManager(HConnection conn, String peerClusterId,
- ReplicationPeers replicationPeers, Configuration conf) {
+ HBaseReplicationEndpoint endpoint, Configuration conf) {
this.conn = conn;
this.peerClusterId = peerClusterId;
- this.replicationPeers = replicationPeers;
+ this.endpoint = endpoint;
this.badReportCounts = Maps.newHashMap();
this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
@@ -107,8 +106,7 @@ public class ReplicationSinkManager {
* @return a replication sink to replicate to
*/
public SinkPeer getReplicationSink() throws IOException {
- if (replicationPeers.getTimestampOfLastChangeToPeer(peerClusterId)
- > this.lastUpdateToPeers) {
+ if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers) {
LOG.info("Current list of sinks is out of date, updating");
chooseSinks();
}
@@ -143,8 +141,7 @@ public class ReplicationSinkManager {
}
void chooseSinks() {
- List<ServerName> slaveAddresses =
- replicationPeers.getRegionServersOfConnectedPeer(peerClusterId);
+ List<ServerName> slaveAddresses = endpoint.getRegionServers();
Collections.shuffle(slaveAddresses, random);
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
sinks = slaveAddresses.subList(0, numSinks);
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 4e2106d..87cbcc6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -21,13 +21,9 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.net.ConnectException;
-import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
import java.util.UUID;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -41,27 +37,25 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.ipc.RemoteException;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
/**
* Class that handles the source of a replication stream.
@@ -82,9 +76,9 @@ public class ReplicationSource extends Thread
public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
// Queue of logs to process
private PriorityBlockingQueue<Path> queue;
- private HConnection conn;
private ReplicationQueues replicationQueues;
private ReplicationPeers replicationPeers;
+
private Configuration conf;
private ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to
@@ -118,8 +112,6 @@ public class ReplicationSource extends Thread
private String peerClusterZnode;
// Maximum number of retries before taking bold actions
private int maxRetriesMultiplier;
- // Socket timeouts require even bolder actions since we don't want to DDOS
- private int socketTimeoutMultiplier;
// Current number of operations (Put/Delete) that we need to replicate
private int currentNbOperations = 0;
// Current size of data we need to replicate
@@ -130,10 +122,14 @@ public class ReplicationSource extends Thread
private MetricsSource metrics;
// Handle on the log reader helper
private ReplicationHLogReaderManager repLogReader;
- // Handles connecting to peer region servers
- private ReplicationSinkManager replicationSinkMgr;
//WARN threshold for the number of queued logs, defaults to 2
private int logQueueWarnThreshold;
+ // ReplicationEndpoint which will handle the actual replication
+ private ReplicationEndpoint replicationEndpoint;
+ // A filter (or a chain of filters) for the WAL entries.
+ private WALEntryFilter walEntryFilter;
+ // Context for ReplicationEndpoint#replicate()
+ private ReplicationEndpoint.ReplicateContext replicateContext;
// throttler
private ReplicationThrottler throttler;
@@ -145,30 +141,30 @@ public class ReplicationSource extends Thread
* @param manager replication manager to ping to
* @param stopper the atomic boolean to use to stop the regionserver
* @param peerClusterZnode the name of our znode
+ * @param clusterId unique UUID for the cluster
+ * @param replicationEndpoint the replication endpoint implementation
+ * @param metrics metrics for replication source
* @throws IOException
*/
+ @Override
public void init(final Configuration conf, final FileSystem fs,
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final Stoppable stopper,
- final String peerClusterZnode, final UUID clusterId) throws IOException {
+ final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
+ final MetricsSource metrics)
+ throws IOException {
this.stopper = stopper;
- this.conf = HBaseConfiguration.create(conf);
+ this.conf = conf;
decorateConf();
this.replicationQueueSizeCapacity =
this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
this.replicationQueueNbCapacity =
this.conf.getInt("replication.source.nb.capacity", 25000);
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
- this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
- maxRetriesMultiplier * maxRetriesMultiplier);
this.queue =
new PriorityBlockingQueue<Path>(
this.conf.getInt("hbase.regionserver.maxlogs", 32),
new LogsComparator());
- // TODO: This connection is replication specific or we should make it particular to
- // replication and make replication specific settings such as compression or codec to use
- // passing Cells.
- this.conn = HConnectionManager.getConnection(this.conf);
long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
this.replicationQueues = replicationQueues;
@@ -177,7 +173,7 @@ public class ReplicationSource extends Thread
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.fs = fs;
- this.metrics = new MetricsSource(peerClusterZnode);
+ this.metrics = metrics;
this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
this.clusterId = clusterId;
@@ -185,8 +181,10 @@ public class ReplicationSource extends Thread
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
- this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, this.conf);
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
+ this.replicationEndpoint = replicationEndpoint;
+
+ this.replicateContext = new ReplicationEndpoint.ReplicateContext();
}
private void decorateConf() {
@@ -209,30 +207,48 @@ public class ReplicationSource extends Thread
}
private void uninitialize() {
- if (this.conn != null) {
- try {
- this.conn.close();
- } catch (IOException e) {
- LOG.debug("Attempt to close connection failed", e);
- }
- }
LOG.debug("Source exiting " + this.peerId);
metrics.clear();
+ if (replicationEndpoint.state() == Service.State.STARTING
+ || replicationEndpoint.state() == Service.State.RUNNING) {
+ replicationEndpoint.stopAndWait();
+ }
}
@Override
public void run() {
- connectToPeers();
// We were stopped while looping to connect to sinks, just abort
if (!this.isActive()) {
uninitialize();
return;
}
+ try {
+ // start the endpoint, connect to the cluster
+ Service.State state = replicationEndpoint.start().get();
+ if (state != Service.State.RUNNING) {
+ LOG.warn("ReplicationEndpoint was not started. Exiting");
+ uninitialize();
+ return;
+ }
+ } catch (Exception ex) {
+ LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
+ throw new RuntimeException(ex);
+ }
+
+ // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
+ ArrayList<WALEntryFilter> filters = Lists.newArrayList(
+ (WALEntryFilter)new SystemTableWALEntryFilter());
+ WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
+ if (filterFromEndpoint != null) {
+ filters.add(filterFromEndpoint);
+ }
+ this.walEntryFilter = new ChainWALEntryFilter(filters);
+
int sleepMultiplier = 1;
// delay this until we are in an asynchronous thread
while (this.isActive() && this.peerClusterId == null) {
- this.peerClusterId = replicationPeers.getPeerUUID(this.peerId);
+ this.peerClusterId = replicationEndpoint.getPeerUUID();
if (this.isActive() && this.peerClusterId == null) {
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++;
@@ -250,9 +266,10 @@ public class ReplicationSource extends Thread
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
// peerClusterId value, which is the same as the source clusterId
- if (clusterId.equals(peerClusterId)) {
+ if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
- + peerClusterId);
+ + peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ + replicationEndpoint.getClass().getName(), null, false);
}
LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
@@ -397,8 +414,8 @@ public class ReplicationSource extends Thread
* entries
* @throws IOException
*/
- protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries)
- throws IOException{
+ protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
+ List<HLog.Entry> entries) throws IOException{
long seenEntries = 0;
if (LOG.isTraceEnabled()) {
LOG.trace("Seeking in " + this.currentPath + " at position "
@@ -409,18 +426,22 @@ public class ReplicationSource extends Thread
HLog.Entry entry =
this.repLogReader.readNextAndSetPosition();
while (entry != null) {
- WALEdit edit = entry.getEdit();
this.metrics.incrLogEditsRead();
seenEntries++;
- // Remove all KVs that should not be replicated
- HLogKey logKey = entry.getKey();
+
// don't replicate if the log entries have already been consumed by the cluster
- if (!logKey.getClusterIds().contains(peerClusterId)) {
- removeNonReplicableEdits(entry);
- // Don't replicate catalog entries, if the WALEdit wasn't
- // containing anything to replicate and if we're currently not set to replicate
- if (!logKey.getTablename().equals(TableName.META_TABLE_NAME) &&
- edit.size() != 0) {
+ if (replicationEndpoint.canReplicateToSameCluster()
+ || !entry.getKey().getClusterIds().contains(peerClusterId)) {
+ // Remove all KVs that should not be replicated
+ entry = walEntryFilter.filter(entry);
+ WALEdit edit = null;
+ HLogKey logKey = null;
+ if (entry != null) {
+ edit = entry.getEdit();
+ logKey = entry.getKey();
+ }
+
+ if (edit != null && edit.size() != 0) {
//Mark that the current cluster has the change
logKey.addClusterId(clusterId);
currentNbOperations += countDistinctRowKeys(edit);
@@ -451,20 +472,6 @@ public class ReplicationSource extends Thread
return seenEntries == 0 && processEndOfFile();
}
- private void connectToPeers() {
- int sleepMultiplier = 1;
-
- // Connect to peer cluster first, unless we have to stop
- while (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
- replicationSinkMgr.chooseSinks();
- if (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
- if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
- sleepMultiplier++;
- }
- }
- }
- }
-
/**
* Poll for the next path
* @return true if a path was obtained, false if not
@@ -594,8 +601,8 @@ public class ReplicationSource extends Thread
/*
* Checks whether the current log file is empty, and it is not a recovered queue. This is to
* handle scenario when in an idle cluster, there is no entry in the current log and we keep on
- * trying to read the log file and get EOFEception. In case of a recovered queue the last log file
- * may be empty, and we don't want to retry that.
+ * trying to read the log file and get EOFException. In case of a recovered queue the last log
+ * file may be empty, and we don't want to retry that.
*/
private boolean isCurrentLogEmpty() {
return (this.repLogReader.getPosition() == 0 &&
@@ -622,47 +629,6 @@ public class ReplicationSource extends Thread
}
/**
- * We only want KVs that are scoped other than local
- * @param entry The entry to check for replication
- */
- protected void removeNonReplicableEdits(HLog.Entry entry) {
- String tabName = entry.getKey().getTablename().getNameAsString();
- ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
- Map<String, List<String>> tableCFs = null;
- try {
- tableCFs = this.replicationPeers.getTableCFs(peerId);
- } catch (IllegalArgumentException e) {
- LOG.error("should not happen: can't get tableCFs for peer " + peerId +
- ", degenerate as if it's not configured by keeping tableCFs==null");
- }
- int size = kvs.size();
-
- // clear kvs(prevent replicating) if logKey's table isn't in this peer's
- // replicable table list (empty tableCFs means all table are replicable)
- if (tableCFs != null && !tableCFs.containsKey(tabName)) {
- kvs.clear();
- } else {
- NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
- List<String> cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
- for (int i = size - 1; i >= 0; i--) {
- KeyValue kv = kvs.get(i);
- // The scope will be null or empty if
- // there's nothing to replicate in that WALEdit
- // ignore(remove) kv if its cf isn't in the replicable cf list
- // (empty cfs means all cfs of this table are replicable)
- if (scopes == null || !scopes.containsKey(kv.getFamily()) ||
- (cfs != null && !cfs.contains(Bytes.toString(kv.getFamily())))) {
- kvs.remove(i);
- }
- }
- }
-
- if (kvs.size() < size/2) {
- kvs.trimToSize();
- }
- }
-
- /**
* Count the number of different row keys in the given edit because of
* mini-batching. We assume that there's at least one KV in the WALEdit.
* @param edit edit to count row keys from
@@ -692,13 +658,6 @@ public class ReplicationSource extends Thread
return;
}
while (this.isActive()) {
- if (!isPeerEnabled()) {
- if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
- sleepMultiplier++;
- }
- continue;
- }
- SinkPeer sinkPeer = null;
try {
if (this.throttler.isEnabled()) {
long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
@@ -719,14 +678,15 @@ public class ReplicationSource extends Thread
this.throttler.resetStartTick();
}
}
- sinkPeer = replicationSinkMgr.getReplicationSink();
- BlockingInterface rrs = sinkPeer.getRegionServer();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Replicating " + entries.size() +
- " entries of total size " + currentSize);
+ replicateContext.setEntries(entries).setSize(currentSize);
+
+ // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
+ boolean replicated = replicationEndpoint.replicate(replicateContext);
+
+ if (!replicated) {
+ continue;
}
- ReplicationProtbufUtil.replicateWALEntry(rrs,
- entries.toArray(new HLog.Entry[entries.size()]));
+
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.repLogReader.getPosition(),
@@ -745,50 +705,9 @@ public class ReplicationSource extends Thread
+ this.totalReplicatedOperations + " operations");
}
break;
-
- } catch (IOException ioe) {
- // Didn't ship anything, but must still age the last time we did
- this.metrics.refreshAgeOfLastShippedOp();
- if (ioe instanceof RemoteException) {
- ioe = ((RemoteException) ioe).unwrapRemoteException();
- LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
- if (ioe instanceof TableNotFoundException) {
- if (sleepForRetries("A table is missing in the peer cluster. "
- + "Replication cannot proceed without losing data.", sleepMultiplier)) {
- sleepMultiplier++;
- }
- // current thread might be interrupted to terminate
- // directly go back to while() for confirm this
- if (isInterrupted()) {
- continue;
- }
- }
- } else {
- if (ioe instanceof SocketTimeoutException) {
- // This exception means we waited for more than 60s and nothing
- // happened, the cluster is alive and calling it right away
- // even for a test just makes things worse.
- sleepForRetries("Encountered a SocketTimeoutException. Since the " +
- "call to the remote cluster timed out, which is usually " +
- "caused by a machine failure or a massive slowdown",
- this.socketTimeoutMultiplier);
- // current thread might be interrupted to terminate
- // directly go back to while() for confirm this
- if (isInterrupted()) {
- continue;
- }
- } else if (ioe instanceof ConnectException) {
- LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
- replicationSinkMgr.chooseSinks();
- } else {
- LOG.warn("Can't replicate because of a local or network error: ", ioe);
- }
- }
-
- if (sinkPeer != null) {
- replicationSinkMgr.reportBadSink(sinkPeer);
- }
- if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
+ } catch (Exception ex) {
+ LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + ex);
+ if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
sleepMultiplier++;
}
}
@@ -801,7 +720,7 @@ public class ReplicationSource extends Thread
* @return true if the peer is enabled, otherwise false
*/
protected boolean isPeerEnabled() {
- return this.replicationPeers.getStatusOfConnectedPeer(this.peerId);
+ return this.replicationPeers.getStatusOfPeer(this.peerId);
}
/**
@@ -835,10 +754,12 @@ public class ReplicationSource extends Thread
return false;
}
+ @Override
public void startup() {
String n = Thread.currentThread().getName();
Thread.UncaughtExceptionHandler handler =
new Thread.UncaughtExceptionHandler() {
+ @Override
public void uncaughtException(final Thread t, final Throwable e) {
LOG.error("Unexpected exception in ReplicationSource," +
" currentPath=" + currentPath, e);
@@ -849,11 +770,17 @@ public class ReplicationSource extends Thread
this.peerClusterZnode, handler);
}
+ @Override
public void terminate(String reason) {
terminate(reason, null);
}
+ @Override
public void terminate(String reason, Exception cause) {
+ terminate(reason, cause, true);
+ }
+
+ public void terminate(String reason, Exception cause, boolean join) {
if (cause == null) {
LOG.info("Closing source "
+ this.peerClusterZnode + " because: " + reason);
@@ -864,17 +791,33 @@ public class ReplicationSource extends Thread
}
this.running = false;
this.interrupt();
- Threads.shutdown(this, this.sleepForRetries * this.maxRetriesMultiplier);
+ ListenableFuture<Service.State> future = null;
+ if (this.replicationEndpoint != null) {
+ future = this.replicationEndpoint.stop();
+ }
+ if (join) {
+ Threads.shutdown(this, this.sleepForRetries);
+ if (future != null) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ LOG.warn("Got exception:" + e);
+ }
+ }
+ }
}
+ @Override
public String getPeerClusterZnode() {
return this.peerClusterZnode;
}
+ @Override
public String getPeerClusterId() {
return this.peerId;
}
+ @Override
public Path getCurrentPath() {
return this.currentPath;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index df599f0..6388d9b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -50,7 +51,8 @@ public interface ReplicationSourceInterface {
public void init(final Configuration conf, final FileSystem fs,
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final Stoppable stopper,
- final String peerClusterZnode, final UUID clusterId) throws IOException;
+ final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
+ final MetricsSource metrics) throws IOException;
/**
* Add a log to the list of logs to replicate
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 7b4cd83..db9c505 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -43,9 +43,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
@@ -115,7 +119,7 @@ public class ReplicationSourceManager implements ReplicationListener {
final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir,
final Path oldLogDir, final UUID clusterId) {
//CopyOnWriteArrayList is thread-safe.
- //Generally, reading is more than modifying.
+ //Generally, reading is more than modifying.
this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers;
@@ -194,7 +198,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* old region server hlog queues
*/
protected void init() throws IOException, ReplicationException {
- for (String id : this.replicationPeers.getConnectedPeers()) {
+ for (String id : this.replicationPeers.getPeerIds()) {
addSource(id);
}
List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
@@ -221,9 +225,12 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
protected ReplicationSourceInterface addSource(String id) throws IOException,
ReplicationException {
+ ReplicationPeerConfig peerConfig
+ = replicationPeers.getReplicationPeerConfig(id);
+ ReplicationPeer peer = replicationPeers.getPeer(id);
ReplicationSourceInterface src =
getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
- this.replicationPeers, stopper, id, this.clusterId);
+ this.replicationPeers, stopper, id, this.clusterId, peerConfig, peer);
synchronized (this.hlogsById) {
this.sources.add(src);
this.hlogsById.put(id, new TreeSet<String>());
@@ -254,7 +261,7 @@ public class ReplicationSourceManager implements ReplicationListener {
public void deleteSource(String peerId, boolean closeConnection) {
this.replicationQueues.removeQueue(peerId);
if (closeConnection) {
- this.replicationPeers.disconnectFromPeer(peerId);
+ this.replicationPeers.peerRemoved(peerId);
}
}
@@ -340,7 +347,9 @@ public class ReplicationSourceManager implements ReplicationListener {
protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
final FileSystem fs, final ReplicationSourceManager manager,
final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
- final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException {
+ final Stoppable stopper, final String peerId, final UUID clusterId,
+ final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
+ throws IOException {
ReplicationSourceInterface src;
try {
@SuppressWarnings("rawtypes")
@@ -351,9 +360,32 @@ public class ReplicationSourceManager implements ReplicationListener {
LOG.warn("Passed replication source implementation throws errors, " +
"defaulting to ReplicationSource", e);
src = new ReplicationSource();
+ }
+ ReplicationEndpoint replicationEndpoint = null;
+ try {
+ String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
+ if (replicationEndpointImpl == null) {
+ // Default to HBase inter-cluster replication endpoint
+ replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
+ }
+ @SuppressWarnings("rawtypes")
+ Class c = Class.forName(replicationEndpointImpl);
+ replicationEndpoint = (ReplicationEndpoint) c.newInstance();
+ } catch (Exception e) {
+ LOG.warn("Passed replication endpoint implementation throws errors", e);
+ throw new IOException(e);
}
- src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId);
+
+ MetricsSource metrics = new MetricsSource(peerId);
+ // init replication source
+ src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId,
+ clusterId, replicationEndpoint, metrics);
+
+ // init replication endpoint
+ replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
+ fs, peerConfig, peerId, clusterId, replicationPeer, metrics));
+
return src;
}
@@ -441,7 +473,7 @@ public class ReplicationSourceManager implements ReplicationListener {
public void peerListChanged(List<String> peerIds) {
for (String id : peerIds) {
try {
- boolean added = this.replicationPeers.connectToPeer(id);
+ boolean added = this.replicationPeers.peerAdded(id);
if (added) {
addSource(id);
}
@@ -507,10 +539,26 @@ public class ReplicationSourceManager implements ReplicationListener {
for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
String peerId = entry.getKey();
try {
+ // there is not an actual peer defined corresponding to peerId for the failover.
+ ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+ String actualPeerId = replicationQueueInfo.getPeerId();
+ ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
+ ReplicationPeerConfig peerConfig = null;
+ try {
+ peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
+ } catch (ReplicationException ex) {
+ LOG.warn("Received exception while getting replication peer config, skipping replay"
+ + ex);
+ }
+ if (peer == null || peerConfig == null) {
+ LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
+ continue;
+ }
+
ReplicationSourceInterface src =
getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
- stopper, peerId, this.clusterId);
- if (!this.rp.getConnectedPeers().contains((src.getPeerClusterId()))) {
+ stopper, peerId, this.clusterId, peerConfig, peer);
+ if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
src.terminate("Recovered queue doesn't belong to any current peer");
break;
}
@@ -561,7 +609,7 @@ public class ReplicationSourceManager implements ReplicationListener {
stats.append(source.getStats() + "\n");
}
for (ReplicationSourceInterface oldSource : oldsources) {
- stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId() + ": ");
+ stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
stats.append(oldSource.getStats()+ "\n");
}
return stats.toString();
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index fa744ff..77bc64e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.client.replication;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -25,6 +29,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Lists;
+
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -117,5 +123,36 @@ public class TestReplicationAdmin {
admin.removePeer(ID_ONE);
}
+ @Test
+ public void testGetTableCfsStr() {
+ // opposite of TestPerTableCFReplication#testParseTableCFsFromConfig()
+
+ Map<TableName, List<String>> tabCFsMap = null;
+
+ // 1. null or empty string, result should be null
+ assertEquals(null, ReplicationAdmin.getTableCfsStr(tabCFsMap));
+
+
+ // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
+ tabCFsMap = new TreeMap<TableName, List<String>>();
+ tabCFsMap.put(TableName.valueOf("tab1"), null); // its table name is "tab1"
+ assertEquals("tab1", ReplicationAdmin.getTableCfsStr(tabCFsMap));
+
+ tabCFsMap = new TreeMap<TableName, List<String>>();
+ tabCFsMap.put(TableName.valueOf("tab1"), Lists.newArrayList("cf1"));
+ assertEquals("tab1:cf1", ReplicationAdmin.getTableCfsStr(tabCFsMap));
+
+ tabCFsMap = new TreeMap<TableName, List<String>>();
+ tabCFsMap.put(TableName.valueOf("tab1"), Lists.newArrayList("cf1", "cf3"));
+ assertEquals("tab1:cf1,cf3", ReplicationAdmin.getTableCfsStr(tabCFsMap));
+
+ // 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
+ tabCFsMap = new TreeMap<TableName, List<String>>();
+ tabCFsMap.put(TableName.valueOf("tab1"), null);
+ tabCFsMap.put(TableName.valueOf("tab2"), Lists.newArrayList("cf1"));
+ tabCFsMap.put(TableName.valueOf("tab3"), Lists.newArrayList("cf1", "cf3"));
+ assertEquals("tab1;tab2:cf1;tab3:cf1,cf3", ReplicationAdmin.getTableCfsStr(tabCFsMap));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index fa3dda6..13a18ce 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
@@ -40,7 +41,8 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
- UUID clusterId) throws IOException {
+ UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics)
+ throws IOException {
this.manager = manager;
this.peerClusterId = peerClusterId;
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
index ee102fc..ff77a94 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
@@ -175,30 +175,30 @@ public class TestPerTableCFReplication {
Map<String, List<String>> tabCFsMap = null;
// 1. null or empty string, result should be null
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig(null);
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(null);
assertEquals(null, tabCFsMap);
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("");
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("");
assertEquals(null, tabCFsMap);
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig(" ");
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(" ");
assertEquals(null, tabCFsMap);
// 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1");
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1");
assertEquals(1, tabCFsMap.size()); // only one table
assertTrue(tabCFsMap.containsKey("tab1")); // its table name is "tab1"
assertFalse(tabCFsMap.containsKey("tab2")); // not other table
assertEquals(null, tabCFsMap.get("tab1")); // null cf-list,
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab2:cf1");
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab2:cf1");
assertEquals(1, tabCFsMap.size()); // only one table
assertTrue(tabCFsMap.containsKey("tab2")); // its table name is "tab2"
assertFalse(tabCFsMap.containsKey("tab1")); // not other table
assertEquals(1, tabCFsMap.get("tab2").size()); // cf-list contains only 1 cf
assertEquals("cf1", tabCFsMap.get("tab2").get(0));// the only cf is "cf1"
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab3 : cf1 , cf3");
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab3 : cf1 , cf3");
assertEquals(1, tabCFsMap.size()); // only one table
assertTrue(tabCFsMap.containsKey("tab3")); // its table name is "tab2"
assertFalse(tabCFsMap.containsKey("tab1")); // not other table
@@ -207,7 +207,7 @@ public class TestPerTableCFReplication {
assertTrue(tabCFsMap.get("tab3").contains("cf3"));// contains "cf3"
// 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
// 3.1 contains 3 tables : "tab1", "tab2" and "tab3"
assertEquals(3, tabCFsMap.size());
assertTrue(tabCFsMap.containsKey("tab1"));
@@ -225,7 +225,8 @@ public class TestPerTableCFReplication {
// 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated
// still use the example of multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;");
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(
+ "tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;");
// 4.1 contains 3 tables : "tab1", "tab2" and "tab3"
assertEquals(3, tabCFsMap.size());
assertTrue(tabCFsMap.containsKey("tab1"));
@@ -243,7 +244,8 @@ public class TestPerTableCFReplication {
// 5. invalid format "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3"
// "tab1:tt:cf1" and "tab2::cf1" are invalid and will be ignored totally
- tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3");
+ tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(
+ "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3");
// 5.1 no "tab1" and "tab2", only "tab3"
assertEquals(1, tabCFsMap.size()); // only one table
assertFalse(tabCFsMap.containsKey("tab1"));
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
new file mode 100644
index 0000000..38aaa7a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests ReplicationSource and ReplicationEndpoint interactions
+ */
+@Category(MediumTests.class)
+public class TestReplicationEndpoint extends TestReplicationBase {
+
+ static int numRegionServers;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TestReplicationBase.setUpBeforeClass();
+ utility2.shutdownMiniCluster(); // we don't need the second cluster
+ admin.removePeer("2");
+ numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TestReplicationBase.tearDownAfterClass();
+ // check stop is called
+ Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
+ }
+
+ @Before
+ public void setup() throws FailedLogCloseException, IOException {
+ ReplicationEndpointForTest.contructedCount.set(0);
+ ReplicationEndpointForTest.startedCount.set(0);
+ ReplicationEndpointForTest.replicateCount.set(0);
+ ReplicationEndpointForTest.lastEntries = null;
+ for (RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) {
+ utility1.getHBaseAdmin().rollHLogWriter(rs.getRegionServer().getServerName().toString());
+ }
+ }
+
+ @Test
+ public void testCustomReplicationEndpoint() throws Exception {
+ // test installing a custom replication endpoint other than the default one.
+ admin.addPeer("testCustomReplicationEndpoint",
+ new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+ .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+
+ // check whether the class has been constructed and started
+ Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
+ }
+ });
+
+ Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
+ }
+ });
+
+ Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
+
+ // now replicate some data.
+ doPut(Bytes.toBytes("row42"));
+
+ Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return ReplicationEndpointForTest.replicateCount.get() >= 1;
+ }
+ });
+
+ doAssert(Bytes.toBytes("row42"));
+
+ admin.removePeer("testCustomReplicationEndpoint");
+ }
+
+ @Test
+ public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
+ admin.addPeer("testReplicationEndpointReturnsFalseOnReplicate",
+ new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+ .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
+ // now replicate some data.
+ doPut(row);
+
+ Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return ReplicationEndpointReturningFalse.replicated.get();
+ }
+ });
+ if (ReplicationEndpointReturningFalse.ex.get() != null) {
+ throw ReplicationEndpointReturningFalse.ex.get();
+ }
+
+ admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
+ }
+
+ @Test
+ public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
+ admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
+ new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+ .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
+ // now replicate some data.
+ doPut(Bytes.toBytes("row1"));
+ doPut(row);
+ doPut(Bytes.toBytes("row2"));
+
+ Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return ReplicationEndpointForTest.replicateCount.get() >= 1;
+ }
+ });
+
+ Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
+ admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
+ }
+
+
+ private void doPut(byte[] row) throws IOException {
+ Put put = new Put(row);
+ put.add(famName, row, row);
+ htable1 = new HTable(conf1, tableName);
+ htable1.put(put);
+ htable1.close();
+ }
+
+ private static void doAssert(byte[] row) throws Exception {
+ if (ReplicationEndpointForTest.lastEntries == null) {
+ return; // first call
+ }
+ Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
+ List<KeyValue> kvs = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getKeyValues();
+ Assert.assertEquals(1, kvs.size());
+ Assert.assertTrue(Bytes.equals(kvs.get(0).getRowArray(), kvs.get(0).getRowOffset(),
+ kvs.get(0).getRowLength(), row, 0, row.length));
+ }
+
+ public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
+ static UUID uuid = UUID.randomUUID();
+ static AtomicInteger contructedCount = new AtomicInteger();
+ static AtomicInteger startedCount = new AtomicInteger();
+ static AtomicInteger stoppedCount = new AtomicInteger();
+ static AtomicInteger replicateCount = new AtomicInteger();
+ static volatile List<HLog.Entry> lastEntries = null;
+
+ public ReplicationEndpointForTest() {
+ contructedCount.incrementAndGet();
+ }
+
+ @Override
+ public UUID getPeerUUID() {
+ return uuid;
+ }
+
+ @Override
+ public boolean replicate(ReplicateContext replicateContext) {
+ replicateCount.incrementAndGet();
+ lastEntries = replicateContext.entries;
+ return true;
+ }
+
+ @Override
+ protected void doStart() {
+ startedCount.incrementAndGet();
+ notifyStarted();
+ }
+
+ @Override
+ protected void doStop() {
+ stoppedCount.incrementAndGet();
+ notifyStopped();
+ }
+ }
+
+ public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
+ static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
+ static AtomicBoolean replicated = new AtomicBoolean(false);
+ @Override
+ public boolean replicate(ReplicateContext replicateContext) {
+ try {
+ // check row
+ doAssert(row);
+ } catch (Exception e) {
+ ex.set(e);
+ }
+
+ super.replicate(replicateContext);
+
+ replicated.set(replicateCount.get() > 10); // first 10 times, we return false
+ return replicated.get();
+ }
+ }
+
+ // return a WALEntry filter which only accepts "row", but not other rows
+ public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
+ static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
+
+ @Override
+ public boolean replicate(ReplicateContext replicateContext) {
+ try {
+ super.replicate(replicateContext);
+ doAssert(row);
+ } catch (Exception e) {
+ ex.set(e);
+ }
+ return true;
+ }
+
+ @Override
+ public WALEntryFilter getWALEntryfilter() {
+ return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
+ @Override
+ public Entry filter(Entry entry) {
+ ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
+ int size = kvs.size();
+ for (int i = size-1; i >= 0; i--) {
+ KeyValue kv = kvs.get(i);
+ if (!Bytes.equals(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+ row, 0, row.length)) {
+ kvs.remove(i);
+ }
+ }
+ return entry;
+ }
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index fd003ad..e560620 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -179,50 +179,48 @@ public abstract class TestReplicationStateBasic {
} catch (IllegalArgumentException e) {
}
try {
- rp.getStatusOfConnectedPeer("bogus");
+ rp.getStatusOfPeer("bogus");
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (IllegalArgumentException e) {
}
- assertFalse(rp.connectToPeer("bogus"));
- rp.disconnectFromPeer("bogus");
- assertEquals(0, rp.getRegionServersOfConnectedPeer("bogus").size());
- assertNull(rp.getPeerUUID("bogus"));
+ assertFalse(rp.peerAdded("bogus"));
+ rp.peerRemoved("bogus");
+
assertNull(rp.getPeerConf("bogus"));
- assertNumberOfPeers(0, 0);
+ assertNumberOfPeers(0);
// Add some peers
- rp.addPeer(ID_ONE, KEY_ONE);
- assertNumberOfPeers(0, 1);
- rp.addPeer(ID_TWO, KEY_TWO);
- assertNumberOfPeers(0, 2);
+ rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+ assertNumberOfPeers(1);
+ rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null);
+ assertNumberOfPeers(2);
// Test methods with a peer that is added but not connected
try {
- rp.getStatusOfConnectedPeer(ID_ONE);
+ rp.getStatusOfPeer(ID_ONE);
fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) {
}
- assertNull(rp.getPeerUUID(ID_ONE));
- assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE)));
- rp.disconnectFromPeer(ID_ONE);
- assertEquals(0, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
-
- // Connect to one peer
- rp.connectToPeer(ID_ONE);
- assertNumberOfPeers(1, 2);
- assertTrue(rp.getStatusOfConnectedPeer(ID_ONE));
+ assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
+ rp.removePeer(ID_ONE);
+ rp.peerRemoved(ID_ONE);
+ assertNumberOfPeers(1);
+
+ // Add one peer
+ rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+ rp.peerAdded(ID_ONE);
+ assertNumberOfPeers(2);
+ assertTrue(rp.getStatusOfPeer(ID_ONE));
rp.disablePeer(ID_ONE);
assertConnectedPeerStatus(false, ID_ONE);
rp.enablePeer(ID_ONE);
assertConnectedPeerStatus(true, ID_ONE);
- assertEquals(1, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
- assertNotNull(rp.getPeerUUID(ID_ONE).toString());
// Disconnect peer
- rp.disconnectFromPeer(ID_ONE);
- assertNumberOfPeers(0, 2);
+ rp.peerRemoved(ID_ONE);
+ assertNumberOfPeers(2);
try {
- rp.getStatusOfConnectedPeer(ID_ONE);
+ rp.getStatusOfPeer(ID_ONE);
fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) {
}
@@ -234,7 +232,7 @@ public abstract class TestReplicationStateBasic {
fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
}
while (true) {
- if (status == rp.getStatusOfConnectedPeer(peerId)) {
+ if (status == rp.getStatusOfPeer(peerId)) {
return;
}
if (zkTimeoutCount < ZK_MAX_COUNT) {
@@ -247,9 +245,9 @@ public abstract class TestReplicationStateBasic {
}
}
- protected void assertNumberOfPeers(int connected, int total) {
- assertEquals(total, rp.getAllPeerClusterKeys().size());
- assertEquals(connected, rp.getConnectedPeers().size());
+ protected void assertNumberOfPeers(int total) {
+ assertEquals(total, rp.getAllPeerConfigs().size());
+ assertEquals(total, rp.getAllPeerIds().size());
assertEquals(total, rp.getAllPeerIds().size());
}
@@ -269,7 +267,7 @@ public abstract class TestReplicationStateBasic {
rq3.addLog("qId" + i, "filename" + j);
}
//Add peers for the corresponding queues so they are not orphans
- rp.addPeer("qId" + i, "bogus" + i);
+ rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("bogus" + i), null);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
index 5273fe3..1c3de71 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
@@ -145,7 +145,7 @@ public class TestReplicationTrackerZKImpl {
@Ignore ("Flakey") @Test(timeout = 30000)
public void testPeerRemovedEvent() throws Exception {
- rp.addPeer("5", utility.getClusterKey());
+ rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
rt.registerListener(new DummyReplicationListener());
rp.removePeer("5");
// wait for event
@@ -158,7 +158,7 @@ public class TestReplicationTrackerZKImpl {
@Ignore ("Flakey") @Test(timeout = 30000)
public void testPeerListChangedEvent() throws Exception {
// add a peer
- rp.addPeer("5", utility.getClusterKey());
+ rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true);
rt.registerListener(new DummyReplicationListener());
rp.disablePeer("5");
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
new file mode 100644
index 0000000..d4b412e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+@Category(SmallTests.class)
+public class TestReplicationWALEntryFilters {
+
+ static byte[] a = new byte[] {'a'};
+ static byte[] b = new byte[] {'b'};
+ static byte[] c = new byte[] {'c'};
+ static byte[] d = new byte[] {'d'};
+
+ @Test
+ public void testSystemTableWALEntryFilter() {
+ SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter();
+
+ // meta
+ HLogKey key1 = new HLogKey( HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
+ HTableDescriptor.META_TABLEDESC.getTableName());
+ HLog.Entry metaEntry = new Entry(key1, null);
+
+ assertNull(filter.filter(metaEntry));
+
+ // ns table
+ HLogKey key2 = new HLogKey(new byte[] {}, HTableDescriptor.NAMESPACE_TABLEDESC.getTableName());
+ HLog.Entry nsEntry = new Entry(key2, null);
+ assertNull(filter.filter(nsEntry));
+
+ // user table
+
+ HLogKey key3 = new HLogKey(new byte[] {}, TableName.valueOf("foo"));
+ HLog.Entry userEntry = new Entry(key3, null);
+
+ assertEquals(userEntry, filter.filter(userEntry));
+ }
+
+ @Test
+ public void testScopeWALEntryFilter() {
+ ScopeWALEntryFilter filter = new ScopeWALEntryFilter();
+
+ HLog.Entry userEntry = createEntry(a, b);
+ HLog.Entry userEntryA = createEntry(a);
+ HLog.Entry userEntryB = createEntry(b);
+ HLog.Entry userEntryEmpty = createEntry();
+
+ // no scopes
+ assertEquals(null, filter.filter(userEntry));
+
+ // empty scopes
+ TreeMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ userEntry = createEntry(a, b);
+ userEntry.getKey().setScopes(scopes);
+ assertEquals(null, filter.filter(userEntry));
+
+ // different scope
+ scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ scopes.put(c, HConstants.REPLICATION_SCOPE_GLOBAL);
+ userEntry = createEntry(a, b);
+ userEntry.getKey().setScopes(scopes);
+ // all kvs should be filtered
+ assertEquals(userEntryEmpty, filter.filter(userEntry));
+
+ // local scope
+ scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
+ userEntry = createEntry(a, b);
+ userEntry.getKey().setScopes(scopes);
+ assertEquals(userEntryEmpty, filter.filter(userEntry));
+ scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
+ assertEquals(userEntryEmpty, filter.filter(userEntry));
+
+ // only scope a
+ scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ scopes.put(a, HConstants.REPLICATION_SCOPE_GLOBAL);
+ userEntry = createEntry(a, b);
+ userEntry.getKey().setScopes(scopes);
+ assertEquals(userEntryA, filter.filter(userEntry));
+ scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
+ assertEquals(userEntryA, filter.filter(userEntry));
+
+ // only scope b
+ scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
+ userEntry = createEntry(a, b);
+ userEntry.getKey().setScopes(scopes);
+ assertEquals(userEntryB, filter.filter(userEntry));
+ scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
+ assertEquals(userEntryB, filter.filter(userEntry));
+
+ // scope a and b
+ scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
+ userEntry = createEntry(a, b);
+ userEntry.getKey().setScopes(scopes);
+ assertEquals(userEntryB, filter.filter(userEntry));
+ scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
+ assertEquals(userEntryB, filter.filter(userEntry));
+ }
+
+ WALEntryFilter nullFilter = new WALEntryFilter() {
+ @Override
+ public Entry filter(Entry entry) {
+ return null;
+ }
+ };
+
+ WALEntryFilter passFilter = new WALEntryFilter() {
+ @Override
+ public Entry filter(Entry entry) {
+ return entry;
+ }
+ };
+
+ @Test
+ public void testChainWALEntryFilter() {
+ HLog.Entry userEntry = createEntry(a, b, c);
+
+ ChainWALEntryFilter filter = new ChainWALEntryFilter(passFilter);
+ assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+
+ filter = new ChainWALEntryFilter(passFilter, passFilter);
+ assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+
+ filter = new ChainWALEntryFilter(passFilter, passFilter, passFilter);
+ assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+
+ filter = new ChainWALEntryFilter(nullFilter);
+ assertEquals(null, filter.filter(userEntry));
+
+ filter = new ChainWALEntryFilter(nullFilter, passFilter);
+ assertEquals(null, filter.filter(userEntry));
+
+ filter = new ChainWALEntryFilter(passFilter, nullFilter);
+ assertEquals(null, filter.filter(userEntry));
+
+ filter = new ChainWALEntryFilter(nullFilter, passFilter, nullFilter);
+ assertEquals(null, filter.filter(userEntry));
+
+ filter = new ChainWALEntryFilter(nullFilter, nullFilter);
+ assertEquals(null, filter.filter(userEntry));
+
+ // flatten
+ filter =
+ new ChainWALEntryFilter(
+ new ChainWALEntryFilter(passFilter,
+ new ChainWALEntryFilter(passFilter, passFilter),
+ new ChainWALEntryFilter(passFilter),
+ new ChainWALEntryFilter(passFilter)),
+ new ChainWALEntryFilter(passFilter));
+ assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+
+
+ filter =
+ new ChainWALEntryFilter(
+ new ChainWALEntryFilter(passFilter,
+ new ChainWALEntryFilter(passFilter,
+ new ChainWALEntryFilter(nullFilter))),
+ new ChainWALEntryFilter(passFilter));
+ assertEquals(null, filter.filter(userEntry));
+ }
+
+ @Test
+ public void testTableCfWALEntryFilter() {
+ ReplicationPeer peer = mock(ReplicationPeer.class);
+
+ when(peer.getTableCFs()).thenReturn(null);
+ HLog.Entry userEntry = createEntry(a, b, c);
+ TableCfWALEntryFilter filter = new TableCfWALEntryFilter(peer);
+ assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+
+ // empty map
+ userEntry = createEntry(a, b, c);
+ Map<String, List<String>> tableCfs = new HashMap<String, List<String>>();
+ when(peer.getTableCFs()).thenReturn(tableCfs);
+ filter = new TableCfWALEntryFilter(peer);
+ assertEquals(null, filter.filter(userEntry));
+
+ // table bar
+ userEntry = createEntry(a, b, c);
+ tableCfs = new HashMap<String, List<String>>();
+ tableCfs.put("bar", null);
+ when(peer.getTableCFs()).thenReturn(tableCfs);
+ filter = new TableCfWALEntryFilter(peer);
+ assertEquals(null, filter.filter(userEntry));
+
+ // table foo:a
+ userEntry = createEntry(a, b, c);
+ tableCfs = new HashMap<String, List<String>>();
+ tableCfs.put("foo", Lists.newArrayList("a"));
+ when(peer.getTableCFs()).thenReturn(tableCfs);
+ filter = new TableCfWALEntryFilter(peer);
+ assertEquals(createEntry(a), filter.filter(userEntry));
+
+ // table foo:a,c
+ userEntry = createEntry(a, b, c, d);
+ tableCfs = new HashMap<String, List<String>>();
+ tableCfs.put("foo", Lists.newArrayList("a", "c"));
+ when(peer.getTableCFs()).thenReturn(tableCfs);
+ filter = new TableCfWALEntryFilter(peer);
+ assertEquals(createEntry(a,c), filter.filter(userEntry));
+ }
+
+ private HLog.Entry createEntry(byte[]... kvs) {
+ HLogKey key1 = new HLogKey(new byte[] {}, TableName.valueOf("foo"));
+ WALEdit edit1 = new WALEdit();
+
+ for (byte[] kv : kvs) {
+ edit1.add(new KeyValue(kv, kv, kv));
+ }
+ return new HLog.Entry(key1, edit1);
+ }
+
+
+ private void assertEquals(HLog.Entry e1, HLog.Entry e2) {
+ Assert.assertEquals(e1 == null, e2 == null);
+ if (e1 == null) {
+ return;
+ }
+
+ // do not compare HLogKeys
+
+ // compare kvs
+ Assert.assertEquals(e1.getEdit() == null, e2.getEdit() == null);
+ if (e1.getEdit() == null) {
+ return;
+ }
+ List<KeyValue> kvs1 = e1.getEdit().getKeyValues();
+ List<KeyValue> kvs2 = e2.getEdit().getKeyValues();
+ Assert.assertEquals(kvs1.size(), kvs2.size());
+ for (int i = 0; i < kvs1.size(); i++) {
+ KeyValue.COMPARATOR.compare(kvs1.get(i), kvs2.get(i));
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
index 296f953..9175192 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
import org.junit.Before;
@@ -42,13 +43,15 @@ public class TestReplicationSinkManager {
private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
private ReplicationPeers replicationPeers;
+ private HBaseReplicationEndpoint replicationEndpoint;
private ReplicationSinkManager sinkManager;
@Before
public void setUp() {
replicationPeers = mock(ReplicationPeers.class);
+ replicationEndpoint = mock(HBaseReplicationEndpoint.class);
sinkManager = new ReplicationSinkManager(mock(HConnection.class),
- PEER_CLUSTER_ID, replicationPeers, new Configuration());
+ PEER_CLUSTER_ID, replicationEndpoint, new Configuration());
}
@Test
@@ -58,7 +61,7 @@ public class TestReplicationSinkManager {
serverNames.add(mock(ServerName.class));
}
- when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+ when(replicationEndpoint.getRegionServers())
.thenReturn(serverNames);
sinkManager.chooseSinks();
@@ -72,7 +75,7 @@ public class TestReplicationSinkManager {
List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
mock(ServerName.class));
- when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+ when(replicationEndpoint.getRegionServers())
.thenReturn(serverNames);
sinkManager.chooseSinks();
@@ -84,8 +87,8 @@ public class TestReplicationSinkManager {
public void testReportBadSink() {
ServerName serverNameA = mock(ServerName.class);
ServerName serverNameB = mock(ServerName.class);
- when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn(
- Lists.newArrayList(serverNameA, serverNameB));
+ when(replicationEndpoint.getRegionServers())
+ .thenReturn(Lists.newArrayList(serverNameA, serverNameB));
sinkManager.chooseSinks();
// Sanity check
@@ -110,7 +113,7 @@ public class TestReplicationSinkManager {
for (int i = 0; i < 20; i++) {
serverNames.add(mock(ServerName.class));
}
- when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+ when(replicationEndpoint.getRegionServers())
.thenReturn(serverNames);
@@ -137,7 +140,7 @@ public class TestReplicationSinkManager {
for (int i = 0; i < 20; i++) {
serverNames.add(mock(ServerName.class));
}
- when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+ when(replicationEndpoint.getRegionServers())
.thenReturn(serverNames);
[2/3] HBASE-11367 Pluggable replication endpoint
Posted by en...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
index 9d037f5..a1caf87 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
@@ -4828,6 +4828,71 @@ public final class ZooKeeperProtos {
*/
com.google.protobuf.ByteString
getClusterkeyBytes();
+
+ // optional string replicationEndpointImpl = 2;
+ /**
+ * <code>optional string replicationEndpointImpl = 2;</code>
+ */
+ boolean hasReplicationEndpointImpl();
+ /**
+ * <code>optional string replicationEndpointImpl = 2;</code>
+ */
+ java.lang.String getReplicationEndpointImpl();
+ /**
+ * <code>optional string replicationEndpointImpl = 2;</code>
+ */
+ com.google.protobuf.ByteString
+ getReplicationEndpointImplBytes();
+
+ // repeated .BytesBytesPair data = 3;
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair>
+ getDataList();
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair getData(int index);
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ int getDataCount();
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder>
+ getDataOrBuilderList();
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder getDataOrBuilder(
+ int index);
+
+ // repeated .NameStringPair configuration = 4;
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair>
+ getConfigurationList();
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair getConfiguration(int index);
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ int getConfigurationCount();
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder>
+ getConfigurationOrBuilderList();
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder getConfigurationOrBuilder(
+ int index);
}
/**
* Protobuf type {@code ReplicationPeer}
@@ -4890,6 +4955,27 @@ public final class ZooKeeperProtos {
clusterkey_ = input.readBytes();
break;
}
+ case 18: {
+ bitField0_ |= 0x00000002;
+ replicationEndpointImpl_ = input.readBytes();
+ break;
+ }
+ case 26: {
+ if (!((mutable_bitField0_ & 0x00000004) == 0x00000004)) {
+ data_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair>();
+ mutable_bitField0_ |= 0x00000004;
+ }
+ data_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.PARSER, extensionRegistry));
+ break;
+ }
+ case 34: {
+ if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
+ configuration_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair>();
+ mutable_bitField0_ |= 0x00000008;
+ }
+ configuration_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.PARSER, extensionRegistry));
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4898,6 +4984,12 @@ public final class ZooKeeperProtos {
throw new com.google.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this);
} finally {
+ if (((mutable_bitField0_ & 0x00000004) == 0x00000004)) {
+ data_ = java.util.Collections.unmodifiableList(data_);
+ }
+ if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
+ configuration_ = java.util.Collections.unmodifiableList(configuration_);
+ }
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
@@ -4988,8 +5080,126 @@ public final class ZooKeeperProtos {
}
}
+ // optional string replicationEndpointImpl = 2;
+ public static final int REPLICATIONENDPOINTIMPL_FIELD_NUMBER = 2;
+ private java.lang.Object replicationEndpointImpl_;
+ /**
+ * <code>optional string replicationEndpointImpl = 2;</code>
+ */
+ public boolean hasReplicationEndpointImpl() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>optional string replicationEndpointImpl = 2;</code>
+ */
+ public java.lang.String getReplicationEndpointImpl() {
+ java.lang.Object ref = replicationEndpointImpl_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ replicationEndpointImpl_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * <code>optional string replicationEndpointImpl = 2;</code>
+ */
+ public com.google.protobuf.ByteString
+ getReplicationEndpointImplBytes() {
+ java.lang.Object ref = replicationEndpointImpl_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ replicationEndpointImpl_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ // repeated .BytesBytesPair data = 3;
+ public static final int DATA_FIELD_NUMBER = 3;
+ private java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair> data_;
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair> getDataList() {
+ return data_;
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder>
+ getDataOrBuilderList() {
+ return data_;
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public int getDataCount() {
+ return data_.size();
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair getData(int index) {
+ return data_.get(index);
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder getDataOrBuilder(
+ int index) {
+ return data_.get(index);
+ }
+
+ // repeated .NameStringPair configuration = 4;
+ public static final int CONFIGURATION_FIELD_NUMBER = 4;
+ private java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair> configuration_;
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair> getConfigurationList() {
+ return configuration_;
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder>
+ getConfigurationOrBuilderList() {
+ return configuration_;
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public int getConfigurationCount() {
+ return configuration_.size();
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair getConfiguration(int index) {
+ return configuration_.get(index);
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder getConfigurationOrBuilder(
+ int index) {
+ return configuration_.get(index);
+ }
+
private void initFields() {
clusterkey_ = "";
+ replicationEndpointImpl_ = "";
+ data_ = java.util.Collections.emptyList();
+ configuration_ = java.util.Collections.emptyList();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -5000,6 +5210,18 @@ public final class ZooKeeperProtos {
memoizedIsInitialized = 0;
return false;
}
+ for (int i = 0; i < getDataCount(); i++) {
+ if (!getData(i).isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
+ for (int i = 0; i < getConfigurationCount(); i++) {
+ if (!getConfiguration(i).isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
memoizedIsInitialized = 1;
return true;
}
@@ -5010,6 +5232,15 @@ public final class ZooKeeperProtos {
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeBytes(1, getClusterkeyBytes());
}
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBytes(2, getReplicationEndpointImplBytes());
+ }
+ for (int i = 0; i < data_.size(); i++) {
+ output.writeMessage(3, data_.get(i));
+ }
+ for (int i = 0; i < configuration_.size(); i++) {
+ output.writeMessage(4, configuration_.get(i));
+ }
getUnknownFields().writeTo(output);
}
@@ -5023,6 +5254,18 @@ public final class ZooKeeperProtos {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(1, getClusterkeyBytes());
}
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(2, getReplicationEndpointImplBytes());
+ }
+ for (int i = 0; i < data_.size(); i++) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeMessageSize(3, data_.get(i));
+ }
+ for (int i = 0; i < configuration_.size(); i++) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeMessageSize(4, configuration_.get(i));
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -5051,6 +5294,15 @@ public final class ZooKeeperProtos {
result = result && getClusterkey()
.equals(other.getClusterkey());
}
+ result = result && (hasReplicationEndpointImpl() == other.hasReplicationEndpointImpl());
+ if (hasReplicationEndpointImpl()) {
+ result = result && getReplicationEndpointImpl()
+ .equals(other.getReplicationEndpointImpl());
+ }
+ result = result && getDataList()
+ .equals(other.getDataList());
+ result = result && getConfigurationList()
+ .equals(other.getConfigurationList());
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -5068,6 +5320,18 @@ public final class ZooKeeperProtos {
hash = (37 * hash) + CLUSTERKEY_FIELD_NUMBER;
hash = (53 * hash) + getClusterkey().hashCode();
}
+ if (hasReplicationEndpointImpl()) {
+ hash = (37 * hash) + REPLICATIONENDPOINTIMPL_FIELD_NUMBER;
+ hash = (53 * hash) + getReplicationEndpointImpl().hashCode();
+ }
+ if (getDataCount() > 0) {
+ hash = (37 * hash) + DATA_FIELD_NUMBER;
+ hash = (53 * hash) + getDataList().hashCode();
+ }
+ if (getConfigurationCount() > 0) {
+ hash = (37 * hash) + CONFIGURATION_FIELD_NUMBER;
+ hash = (53 * hash) + getConfigurationList().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -5174,6 +5438,8 @@ public final class ZooKeeperProtos {
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ getDataFieldBuilder();
+ getConfigurationFieldBuilder();
}
}
private static Builder create() {
@@ -5184,6 +5450,20 @@ public final class ZooKeeperProtos {
super.clear();
clusterkey_ = "";
bitField0_ = (bitField0_ & ~0x00000001);
+ replicationEndpointImpl_ = "";
+ bitField0_ = (bitField0_ & ~0x00000002);
+ if (dataBuilder_ == null) {
+ data_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000004);
+ } else {
+ dataBuilder_.clear();
+ }
+ if (configurationBuilder_ == null) {
+ configuration_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000008);
+ } else {
+ configurationBuilder_.clear();
+ }
return this;
}
@@ -5216,6 +5496,28 @@ public final class ZooKeeperProtos {
to_bitField0_ |= 0x00000001;
}
result.clusterkey_ = clusterkey_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.replicationEndpointImpl_ = replicationEndpointImpl_;
+ if (dataBuilder_ == null) {
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ data_ = java.util.Collections.unmodifiableList(data_);
+ bitField0_ = (bitField0_ & ~0x00000004);
+ }
+ result.data_ = data_;
+ } else {
+ result.data_ = dataBuilder_.build();
+ }
+ if (configurationBuilder_ == null) {
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ configuration_ = java.util.Collections.unmodifiableList(configuration_);
+ bitField0_ = (bitField0_ & ~0x00000008);
+ }
+ result.configuration_ = configuration_;
+ } else {
+ result.configuration_ = configurationBuilder_.build();
+ }
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -5237,6 +5539,63 @@ public final class ZooKeeperProtos {
clusterkey_ = other.clusterkey_;
onChanged();
}
+ if (other.hasReplicationEndpointImpl()) {
+ bitField0_ |= 0x00000002;
+ replicationEndpointImpl_ = other.replicationEndpointImpl_;
+ onChanged();
+ }
+ if (dataBuilder_ == null) {
+ if (!other.data_.isEmpty()) {
+ if (data_.isEmpty()) {
+ data_ = other.data_;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ } else {
+ ensureDataIsMutable();
+ data_.addAll(other.data_);
+ }
+ onChanged();
+ }
+ } else {
+ if (!other.data_.isEmpty()) {
+ if (dataBuilder_.isEmpty()) {
+ dataBuilder_.dispose();
+ dataBuilder_ = null;
+ data_ = other.data_;
+ bitField0_ = (bitField0_ & ~0x00000004);
+ dataBuilder_ =
+ com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
+ getDataFieldBuilder() : null;
+ } else {
+ dataBuilder_.addAllMessages(other.data_);
+ }
+ }
+ }
+ if (configurationBuilder_ == null) {
+ if (!other.configuration_.isEmpty()) {
+ if (configuration_.isEmpty()) {
+ configuration_ = other.configuration_;
+ bitField0_ = (bitField0_ & ~0x00000008);
+ } else {
+ ensureConfigurationIsMutable();
+ configuration_.addAll(other.configuration_);
+ }
+ onChanged();
+ }
+ } else {
+ if (!other.configuration_.isEmpty()) {
+ if (configurationBuilder_.isEmpty()) {
+ configurationBuilder_.dispose();
+ configurationBuilder_ = null;
+ configuration_ = other.configuration_;
+ bitField0_ = (bitField0_ & ~0x00000008);
+ configurationBuilder_ =
+ com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
+ getConfigurationFieldBuilder() : null;
+ } else {
+ configurationBuilder_.addAllMessages(other.configuration_);
+ }
+ }
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -5246,6 +5605,18 @@ public final class ZooKeeperProtos {
return false;
}
+ for (int i = 0; i < getDataCount(); i++) {
+ if (!getData(i).isInitialized()) {
+
+ return false;
+ }
+ }
+ for (int i = 0; i < getConfigurationCount(); i++) {
+ if (!getConfiguration(i).isInitialized()) {
+
+ return false;
+ }
+ }
return true;
}
@@ -5372,6 +5743,560 @@ public final class ZooKeeperProtos {
return this;
}
+ // optional string replicationEndpointImpl = 2;
+ private java.lang.Object replicationEndpointImpl_ = "";
+ /**
+ * <code>optional string replicationEndpointImpl = 2;</code>
+ */
+ public boolean hasReplicationEndpointImpl() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>optional string replicationEndpointImpl = 2;</code>
+ */
+ public java.lang.String getReplicationEndpointImpl() {
+ java.lang.Object ref = replicationEndpointImpl_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ replicationEndpointImpl_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * <code>optional string replicationEndpointImpl = 2;</code>
+ */
+ public com.google.protobuf.ByteString
+ getReplicationEndpointImplBytes() {
+ java.lang.Object ref = replicationEndpointImpl_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ replicationEndpointImpl_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * <code>optional string replicationEndpointImpl = 2;</code>
+ */
+ public Builder setReplicationEndpointImpl(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ replicationEndpointImpl_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string replicationEndpointImpl = 2;</code>
+ */
+ public Builder clearReplicationEndpointImpl() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ replicationEndpointImpl_ = getDefaultInstance().getReplicationEndpointImpl();
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string replicationEndpointImpl = 2;</code>
+ */
+ public Builder setReplicationEndpointImplBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ replicationEndpointImpl_ = value;
+ onChanged();
+ return this;
+ }
+
+ // repeated .BytesBytesPair data = 3;
+ private java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair> data_ =
+ java.util.Collections.emptyList();
+ private void ensureDataIsMutable() {
+ if (!((bitField0_ & 0x00000004) == 0x00000004)) {
+ data_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair>(data_);
+ bitField0_ |= 0x00000004;
+ }
+ }
+
+ private com.google.protobuf.RepeatedFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder> dataBuilder_;
+
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair> getDataList() {
+ if (dataBuilder_ == null) {
+ return java.util.Collections.unmodifiableList(data_);
+ } else {
+ return dataBuilder_.getMessageList();
+ }
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public int getDataCount() {
+ if (dataBuilder_ == null) {
+ return data_.size();
+ } else {
+ return dataBuilder_.getCount();
+ }
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair getData(int index) {
+ if (dataBuilder_ == null) {
+ return data_.get(index);
+ } else {
+ return dataBuilder_.getMessage(index);
+ }
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public Builder setData(
+ int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair value) {
+ if (dataBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureDataIsMutable();
+ data_.set(index, value);
+ onChanged();
+ } else {
+ dataBuilder_.setMessage(index, value);
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public Builder setData(
+ int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder builderForValue) {
+ if (dataBuilder_ == null) {
+ ensureDataIsMutable();
+ data_.set(index, builderForValue.build());
+ onChanged();
+ } else {
+ dataBuilder_.setMessage(index, builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public Builder addData(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair value) {
+ if (dataBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureDataIsMutable();
+ data_.add(value);
+ onChanged();
+ } else {
+ dataBuilder_.addMessage(value);
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public Builder addData(
+ int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair value) {
+ if (dataBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureDataIsMutable();
+ data_.add(index, value);
+ onChanged();
+ } else {
+ dataBuilder_.addMessage(index, value);
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public Builder addData(
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder builderForValue) {
+ if (dataBuilder_ == null) {
+ ensureDataIsMutable();
+ data_.add(builderForValue.build());
+ onChanged();
+ } else {
+ dataBuilder_.addMessage(builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public Builder addData(
+ int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder builderForValue) {
+ if (dataBuilder_ == null) {
+ ensureDataIsMutable();
+ data_.add(index, builderForValue.build());
+ onChanged();
+ } else {
+ dataBuilder_.addMessage(index, builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public Builder addAllData(
+ java.lang.Iterable<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair> values) {
+ if (dataBuilder_ == null) {
+ ensureDataIsMutable();
+ super.addAll(values, data_);
+ onChanged();
+ } else {
+ dataBuilder_.addAllMessages(values);
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public Builder clearData() {
+ if (dataBuilder_ == null) {
+ data_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000004);
+ onChanged();
+ } else {
+ dataBuilder_.clear();
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public Builder removeData(int index) {
+ if (dataBuilder_ == null) {
+ ensureDataIsMutable();
+ data_.remove(index);
+ onChanged();
+ } else {
+ dataBuilder_.remove(index);
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder getDataBuilder(
+ int index) {
+ return getDataFieldBuilder().getBuilder(index);
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder getDataOrBuilder(
+ int index) {
+ if (dataBuilder_ == null) {
+ return data_.get(index); } else {
+ return dataBuilder_.getMessageOrBuilder(index);
+ }
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder>
+ getDataOrBuilderList() {
+ if (dataBuilder_ != null) {
+ return dataBuilder_.getMessageOrBuilderList();
+ } else {
+ return java.util.Collections.unmodifiableList(data_);
+ }
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder addDataBuilder() {
+ return getDataFieldBuilder().addBuilder(
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.getDefaultInstance());
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder addDataBuilder(
+ int index) {
+ return getDataFieldBuilder().addBuilder(
+ index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.getDefaultInstance());
+ }
+ /**
+ * <code>repeated .BytesBytesPair data = 3;</code>
+ */
+ public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder>
+ getDataBuilderList() {
+ return getDataFieldBuilder().getBuilderList();
+ }
+ private com.google.protobuf.RepeatedFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder>
+ getDataFieldBuilder() {
+ if (dataBuilder_ == null) {
+ dataBuilder_ = new com.google.protobuf.RepeatedFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder>(
+ data_,
+ ((bitField0_ & 0x00000004) == 0x00000004),
+ getParentForChildren(),
+ isClean());
+ data_ = null;
+ }
+ return dataBuilder_;
+ }
+
+ // repeated .NameStringPair configuration = 4;
+ private java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair> configuration_ =
+ java.util.Collections.emptyList();
+ private void ensureConfigurationIsMutable() {
+ if (!((bitField0_ & 0x00000008) == 0x00000008)) {
+ configuration_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair>(configuration_);
+ bitField0_ |= 0x00000008;
+ }
+ }
+
+ private com.google.protobuf.RepeatedFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder> configurationBuilder_;
+
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair> getConfigurationList() {
+ if (configurationBuilder_ == null) {
+ return java.util.Collections.unmodifiableList(configuration_);
+ } else {
+ return configurationBuilder_.getMessageList();
+ }
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public int getConfigurationCount() {
+ if (configurationBuilder_ == null) {
+ return configuration_.size();
+ } else {
+ return configurationBuilder_.getCount();
+ }
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair getConfiguration(int index) {
+ if (configurationBuilder_ == null) {
+ return configuration_.get(index);
+ } else {
+ return configurationBuilder_.getMessage(index);
+ }
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public Builder setConfiguration(
+ int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair value) {
+ if (configurationBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureConfigurationIsMutable();
+ configuration_.set(index, value);
+ onChanged();
+ } else {
+ configurationBuilder_.setMessage(index, value);
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public Builder setConfiguration(
+ int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder builderForValue) {
+ if (configurationBuilder_ == null) {
+ ensureConfigurationIsMutable();
+ configuration_.set(index, builderForValue.build());
+ onChanged();
+ } else {
+ configurationBuilder_.setMessage(index, builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public Builder addConfiguration(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair value) {
+ if (configurationBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureConfigurationIsMutable();
+ configuration_.add(value);
+ onChanged();
+ } else {
+ configurationBuilder_.addMessage(value);
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public Builder addConfiguration(
+ int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair value) {
+ if (configurationBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureConfigurationIsMutable();
+ configuration_.add(index, value);
+ onChanged();
+ } else {
+ configurationBuilder_.addMessage(index, value);
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public Builder addConfiguration(
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder builderForValue) {
+ if (configurationBuilder_ == null) {
+ ensureConfigurationIsMutable();
+ configuration_.add(builderForValue.build());
+ onChanged();
+ } else {
+ configurationBuilder_.addMessage(builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public Builder addConfiguration(
+ int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder builderForValue) {
+ if (configurationBuilder_ == null) {
+ ensureConfigurationIsMutable();
+ configuration_.add(index, builderForValue.build());
+ onChanged();
+ } else {
+ configurationBuilder_.addMessage(index, builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public Builder addAllConfiguration(
+ java.lang.Iterable<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair> values) {
+ if (configurationBuilder_ == null) {
+ ensureConfigurationIsMutable();
+ super.addAll(values, configuration_);
+ onChanged();
+ } else {
+ configurationBuilder_.addAllMessages(values);
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public Builder clearConfiguration() {
+ if (configurationBuilder_ == null) {
+ configuration_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000008);
+ onChanged();
+ } else {
+ configurationBuilder_.clear();
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public Builder removeConfiguration(int index) {
+ if (configurationBuilder_ == null) {
+ ensureConfigurationIsMutable();
+ configuration_.remove(index);
+ onChanged();
+ } else {
+ configurationBuilder_.remove(index);
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder getConfigurationBuilder(
+ int index) {
+ return getConfigurationFieldBuilder().getBuilder(index);
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder getConfigurationOrBuilder(
+ int index) {
+ if (configurationBuilder_ == null) {
+ return configuration_.get(index); } else {
+ return configurationBuilder_.getMessageOrBuilder(index);
+ }
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder>
+ getConfigurationOrBuilderList() {
+ if (configurationBuilder_ != null) {
+ return configurationBuilder_.getMessageOrBuilderList();
+ } else {
+ return java.util.Collections.unmodifiableList(configuration_);
+ }
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder addConfigurationBuilder() {
+ return getConfigurationFieldBuilder().addBuilder(
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.getDefaultInstance());
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder addConfigurationBuilder(
+ int index) {
+ return getConfigurationFieldBuilder().addBuilder(
+ index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.getDefaultInstance());
+ }
+ /**
+ * <code>repeated .NameStringPair configuration = 4;</code>
+ */
+ public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder>
+ getConfigurationBuilderList() {
+ return getConfigurationFieldBuilder().getBuilderList();
+ }
+ private com.google.protobuf.RepeatedFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder>
+ getConfigurationFieldBuilder() {
+ if (configurationBuilder_ == null) {
+ configurationBuilder_ = new com.google.protobuf.RepeatedFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder>(
+ configuration_,
+ ((bitField0_ & 0x00000008) == 0x00000008),
+ getParentForChildren(),
+ isClean());
+ configuration_ = null;
+ }
+ return configurationBuilder_;
+ }
+
// @@protoc_insertion_point(builder_scope:ReplicationPeer)
}
@@ -9598,23 +10523,25 @@ public final class ZooKeeperProtos {
"NKNOWN\020\000\022\021\n\rLOG_SPLITTING\020\001\022\016\n\nLOG_REPLA" +
"Y\020\002\"n\n\005Table\022$\n\005state\030\001 \002(\0162\014.Table.Stat" +
"e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
- "BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"%\n\017R" +
- "eplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\"^\n\020Re" +
- "plicationState\022&\n\005state\030\001 \002(\0162\027.Replicat",
- "ionState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010" +
- "DISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n" +
- "\010position\030\001 \002(\003\"%\n\017ReplicationLock\022\022\n\nlo" +
- "ck_owner\030\001 \002(\t\"\230\001\n\tTableLock\022\036\n\ntable_na" +
- "me\030\001 \001(\0132\n.TableName\022\037\n\nlock_owner\030\002 \001(\013" +
- "2\013.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_s" +
- "hared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_t" +
- "ime\030\006 \001(\003\";\n\017StoreSequenceId\022\023\n\013family_n" +
- "ame\030\001 \002(\014\022\023\n\013sequence_id\030\002 \002(\004\"g\n\026Region" +
- "StoreSequenceIds\022 \n\030last_flushed_sequenc",
- "e_id\030\001 \002(\004\022+\n\021store_sequence_id\030\002 \003(\0132\020." +
- "StoreSequenceIdBE\n*org.apache.hadoop.hba" +
- "se.protobuf.generatedB\017ZooKeeperProtosH\001" +
- "\210\001\001\240\001\001"
+ "BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"\215\001\n\017" +
+ "ReplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027r" +
+ "eplicationEndpointImpl\030\002 \001(\t\022\035\n\004data\030\003 \003",
+ "(\0132\017.BytesBytesPair\022&\n\rconfiguration\030\004 \003" +
+ "(\0132\017.NameStringPair\"^\n\020ReplicationState\022" +
+ "&\n\005state\030\001 \002(\0162\027.ReplicationState.State\"" +
+ "\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027R" +
+ "eplicationHLogPosition\022\020\n\010position\030\001 \002(\003" +
+ "\"%\n\017ReplicationLock\022\022\n\nlock_owner\030\001 \002(\t\"" +
+ "\230\001\n\tTableLock\022\036\n\ntable_name\030\001 \001(\0132\n.Tabl" +
+ "eName\022\037\n\nlock_owner\030\002 \001(\0132\013.ServerName\022\021" +
+ "\n\tthread_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007" +
+ "purpose\030\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\";\n\017St",
+ "oreSequenceId\022\023\n\013family_name\030\001 \002(\014\022\023\n\013se" +
+ "quence_id\030\002 \002(\004\"g\n\026RegionStoreSequenceId" +
+ "s\022 \n\030last_flushed_sequence_id\030\001 \002(\004\022+\n\021s" +
+ "tore_sequence_id\030\002 \003(\0132\020.StoreSequenceId" +
+ "BE\n*org.apache.hadoop.hbase.protobuf.gen" +
+ "eratedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9662,7 +10589,7 @@ public final class ZooKeeperProtos {
internal_static_ReplicationPeer_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ReplicationPeer_descriptor,
- new java.lang.String[] { "Clusterkey", });
+ new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", });
internal_static_ReplicationState_descriptor =
getDescriptor().getMessageTypes().get(7);
internal_static_ReplicationState_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-protocol/src/main/protobuf/ZooKeeper.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/ZooKeeper.proto b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
index 37816da..598385c 100644
--- a/hbase-protocol/src/main/protobuf/ZooKeeper.proto
+++ b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
@@ -119,6 +119,9 @@ message ReplicationPeer {
// clusterkey is the concatenation of the slave cluster's
// hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
required string clusterkey = 1;
+ optional string replicationEndpointImpl = 2;
+ repeated BytesBytesPair data = 3;
+ repeated NameStringPair configuration = 4;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 87fa157..d4ac8f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -39,9 +39,11 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.mapreduce.Job;
@@ -131,6 +133,7 @@ public class VerifyReplication extends Configured implements Tool {
}
}
+ @Override
protected void cleanup(Context context) {
if (replicatedScanner != null) {
replicatedScanner.close();
@@ -141,7 +144,7 @@ public class VerifyReplication extends Configured implements Tool {
private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
ZooKeeperWatcher localZKW = null;
- ReplicationPeer peer = null;
+ ReplicationPeerZKImpl peer = null;
try {
localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
new Abortable() {
@@ -152,11 +155,11 @@ public class VerifyReplication extends Configured implements Tool {
ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
rp.init();
- Configuration peerConf = rp.getPeerConf(peerId);
- if (peerConf == null) {
+ Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
+ if (pair == null) {
throw new IOException("Couldn't get peer conf!");
}
-
+ Configuration peerConf = rp.getPeerConf(peerId).getSecond();
return ZKUtil.getZooKeeperClusterKey(peerConf);
} catch (ReplicationException e) {
throw new IOException(
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index 99a9a5c..8c72a17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
@@ -133,6 +133,7 @@ public interface HLog {
*/
// TODO: Remove this Writable.
// TODO: Why is this in here? Implementation detail?
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
class Entry implements Writable {
private WALEdit edit;
private HLogKey key;
@@ -224,7 +225,7 @@ public interface HLog {
* @return the number of HLog files
*/
int getNumLogFiles();
-
+
/**
* @return the size of HLog files
*/
@@ -292,9 +293,10 @@ public interface HLog {
* @param sequenceId
* @throws IOException
* @deprecated For tests only and even then, should use
- * {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean,
+ * {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean,
* List)} and {@link #sync()} instead.
*/
+ @Deprecated
@VisibleForTesting
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException;
@@ -343,6 +345,7 @@ public interface HLog {
* instead because you can get back the region edit/sequenceid; it is set into the passed in
* <code>key</code>.
*/
+ @Deprecated
long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
List<UUID> clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId,
boolean isInMemstore, long nonceGroup, long nonce) throws IOException;
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index 770a588..cc10cb2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
@@ -67,7 +68,7 @@ import com.google.protobuf.ByteString;
*/
// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
// purposes. They need to be merged into HLogEntry.
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
public static final Log LOG = LogFactory.getLog(HLogKey.class);
@@ -190,7 +191,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
*/
public HLogKey(final byte [] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
- init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds,
+ init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds,
nonceGroup, nonce);
}
@@ -208,7 +209,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
*/
public HLogKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
long nonceGroup, long nonce) {
- init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTimeMillis(),
+ init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTimeMillis(),
EMPTY_UUIDS, nonceGroup, nonce);
}
@@ -254,7 +255,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
this.logSeqNum = sequence;
this.seqNumAssignedLatch.countDown();
}
-
+
/**
* Used to set original seq Id for HLogKey during wal replay
* @param seqId
@@ -276,6 +277,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
* @return long the new assigned sequence number
* @throws InterruptedException
*/
+ @Override
public long getSequenceNumber() throws IOException {
try {
this.seqNumAssignedLatch.await();
@@ -396,6 +398,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
return result;
}
+ @Override
public int compareTo(HLogKey o) {
int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName);
if (result == 0) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index 85e2d7c..24d9d6d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.HeapSize;
@@ -74,7 +75,7 @@ import org.apache.hadoop.io.Writable;
* is an old style KeyValue or the new style WALEdit.
*
*/
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public class WALEdit implements Writable, HeapSize {
public static final Log LOG = LogFactory.getLog(WALEdit.class);
@@ -288,4 +289,4 @@ public class WALEdit implements Writable, HeapSize {
}
return null;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
new file mode 100644
index 0000000..1c9d0f6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractService;
+
+/**
+ * A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this
+ * class rather than implementing {@link ReplicationEndpoint} directly for better backwards
+ * compatibility.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public abstract class BaseReplicationEndpoint extends AbstractService
+ implements ReplicationEndpoint {
+
+ protected Context ctx;
+
+ @Override
+ public void init(Context context) throws IOException {
+ this.ctx = context;
+ }
+
+ /** Returns a default set of filters */
+ @Override
+ public WALEntryFilter getWALEntryfilter() {
+ ArrayList<WALEntryFilter> filters = Lists.newArrayList();
+ WALEntryFilter scopeFilter = getScopeWALEntryFilter();
+ if (scopeFilter != null) {
+ filters.add(scopeFilter);
+ }
+ WALEntryFilter tableCfFilter = getTableCfWALEntryFilter();
+ if (tableCfFilter != null) {
+ filters.add(tableCfFilter);
+ }
+ return filters.isEmpty() ? null : new ChainWALEntryFilter(filters);
+ }
+
+ /** Returns a WALEntryFilter for checking the scope. Subclasses can
+ * return null if they don't want this filter */
+ protected WALEntryFilter getScopeWALEntryFilter() {
+ return new ScopeWALEntryFilter();
+ }
+
+ /** Returns a WALEntryFilter for checking replication per table and CF. Subclasses can
+ * return null if they don't want this filter */
+ protected WALEntryFilter getTableCfWALEntryFilter() {
+ return new TableCfWALEntryFilter(ctx.getReplicationPeer());
+ }
+
+ @Override
+ public boolean canReplicateToSameCluster() {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
new file mode 100644
index 0000000..f701e94
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+
+/**
+ * A {@link WALEntryFilter} which contains multiple filters and applies them
+ * in chain order
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public class ChainWALEntryFilter implements WALEntryFilter {
+
+ private final WALEntryFilter[] filters;
+
+ public ChainWALEntryFilter(WALEntryFilter...filters) {
+ this.filters = filters;
+ }
+
+ public ChainWALEntryFilter(List<WALEntryFilter> filters) {
+ ArrayList<WALEntryFilter> rawFilters = new ArrayList<WALEntryFilter>(filters.size());
+ // flatten the chains
+ for (WALEntryFilter filter : filters) {
+ if (filter instanceof ChainWALEntryFilter) {
+ for (WALEntryFilter f : ((ChainWALEntryFilter) filter).filters) {
+ rawFilters.add(f);
+ }
+ } else {
+ rawFilters.add(filter);
+ }
+ }
+
+ this.filters = rawFilters.toArray(new WALEntryFilter[rawFilters.size()]);
+ }
+
+ @Override
+ public Entry filter(Entry entry) {
+ for (WALEntryFilter filter : filters) {
+ if (entry == null) {
+ return null;
+ }
+ entry = filter.filter(entry);
+ }
+ return entry;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
new file mode 100644
index 0000000..4b9a28f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.AuthFailedException;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+
+/**
+ * A {@link BaseReplicationEndpoint} for replication endpoints whose
+ * target cluster is an HBase cluster.
+ */
+@InterfaceAudience.Private
+public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
+ implements Abortable {
+
+ private static final Log LOG = LogFactory.getLog(HBaseReplicationEndpoint.class);
+
+ private ZooKeeperWatcher zkw = null;
+
+ private List<ServerName> regionServers = new ArrayList<ServerName>(0);
+ private volatile long lastRegionServerUpdate;
+
+ protected void disconnect() {
+ if (zkw != null){
+ zkw.close();
+ }
+ }
+
+ /**
+ * A private method used to re-establish a zookeeper session with a peer cluster.
+ * @param ke
+ */
+ protected void reconnect(KeeperException ke) {
+ if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
+ || ke instanceof AuthFailedException) {
+ String clusterKey = ctx.getPeerConfig().getClusterKey();
+ LOG.warn("Lost the ZooKeeper connection for peer " + clusterKey, ke);
+ try {
+ reloadZkWatcher();
+ } catch (IOException io) {
+ LOG.warn("Creation of ZookeeperWatcher failed for peer " + clusterKey, io);
+ }
+ }
+ }
+
+ @Override
+ protected void doStart() {
+ try {
+ reloadZkWatcher();
+ notifyStarted();
+ } catch (IOException e) {
+ notifyFailed(e);
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ disconnect();
+ notifyStopped();
+ }
+
+ @Override
+ public UUID getPeerUUID() {
+ UUID peerUUID = null;
+ try {
+ peerUUID = ZKClusterId.getUUIDForCluster(zkw);
+ } catch (KeeperException ke) {
+ reconnect(ke);
+ }
+ return peerUUID;
+ }
+
+ /**
+ * Get the ZK connection to this peer
+ * @return zk connection
+ */
+ protected ZooKeeperWatcher getZkw() {
+ return zkw;
+ }
+
+ /**
+ * Closes the current ZKW (if not null) and creates a new one
+ * @throws IOException If anything goes wrong connecting
+ */
+ void reloadZkWatcher() throws IOException {
+ if (zkw != null) zkw.close();
+ zkw = new ZooKeeperWatcher(ctx.getConfiguration(),
+ "connection to cluster: " + ctx.getPeerId(), this);
+ getZkw().registerListener(new PeerRegionServerListener(this));
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.fatal("The HBaseReplicationEndpoint corresponding to peer " + ctx.getPeerId()
+ + " was aborted for the following reason(s):" + why, e);
+ }
+
+ @Override
+ public boolean isAborted() {
+ // Currently this is never "Aborted", we just log when the abort method is called.
+ return false;
+ }
+
+ /**
+ * Get the list of all the region servers from the specified peer
+ * @param zkw zk connection to use
+ * @return list of region server addresses or an empty list if the slave is unavailable
+ */
+ protected static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
+ throws KeeperException {
+ List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode);
+ if (children == null) {
+ return Collections.emptyList();
+ }
+ List<ServerName> addresses = new ArrayList<ServerName>(children.size());
+ for (String child : children) {
+ addresses.add(ServerName.parseServerName(child));
+ }
+ return addresses;
+ }
+
+ /**
+ * Get a list of all the addresses of all the region servers
+ * for this peer cluster
+ * @return list of addresses
+ * @throws KeeperException
+ */
+ public List<ServerName> getRegionServers() {
+ try {
+ setRegionServers(fetchSlavesAddresses(this.getZkw()));
+ } catch (KeeperException ke) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Fetch salves addresses failed.", ke);
+ }
+ reconnect(ke);
+ }
+ return regionServers;
+ }
+
+ /**
+ * Set the list of region servers for that peer
+ * @param regionServers list of addresses for the region servers
+ */
+ public void setRegionServers(List<ServerName> regionServers) {
+ this.regionServers = regionServers;
+ lastRegionServerUpdate = System.currentTimeMillis();
+ }
+
+ /**
+ * Get the timestamp at which the last change occurred to the list of region servers to replicate
+ * to.
+ * @return The System.currentTimeMillis at the last time the list of peer region servers changed.
+ */
+ public long getLastRegionServerUpdate() {
+ return lastRegionServerUpdate;
+ }
+
+ /**
+ * Tracks changes to the list of region servers in a peer's cluster.
+ */
+ public static class PeerRegionServerListener extends ZooKeeperListener {
+
+ private final HBaseReplicationEndpoint replicationEndpoint;
+ private final String regionServerListNode;
+
+ public PeerRegionServerListener(HBaseReplicationEndpoint replicationPeer) {
+ super(replicationPeer.getZkw());
+ this.replicationEndpoint = replicationPeer;
+ this.regionServerListNode = replicationEndpoint.getZkw().rsZNode;
+ }
+
+ @Override
+ public synchronized void nodeChildrenChanged(String path) {
+ if (path.equals(regionServerListNode)) {
+ try {
+ LOG.info("Detected change to peer region servers, fetching updated list");
+ replicationEndpoint.setRegionServers(fetchSlavesAddresses(replicationEndpoint.getZkw()));
+ } catch (KeeperException e) {
+ LOG.fatal("Error reading slave addresses", e);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
new file mode 100644
index 0000000..90e3460
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+
+import com.google.common.util.concurrent.Service;
+
+/**
+ * ReplicationEndpoint is a plugin which implements replication
+ * to other HBase clusters, or other systems. ReplicationEndpoint implementation
+ * can be specified at the peer creation time by specifying it
+ * in the {@link ReplicationPeerConfig}. A ReplicationEndpoint is run in a thread
+ * in each region server in the same process.
+ * <p>
+ * ReplicationEndpoint is closely tied to ReplicationSource in a producer-consumer
+ * relation. ReplicationSource is an HBase-private class which tails the logs and manages
+ * the queue of logs plus management and persistence of all the state for replication.
+ * ReplicationEndpoint on the other hand is responsible for doing the actual shipping
+ * and persisting of the WAL entries in the other cluster.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface ReplicationEndpoint extends Service {
+
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+ class Context {
+ private final Configuration conf;
+ private final FileSystem fs;
+ private final ReplicationPeerConfig peerConfig;
+ private final ReplicationPeer replicationPeer;
+ private final String peerId;
+ private final UUID clusterId;
+ private final MetricsSource metrics;
+
+ @InterfaceAudience.Private
+ public Context(
+ final Configuration conf,
+ final FileSystem fs,
+ final ReplicationPeerConfig peerConfig,
+ final String peerId,
+ final UUID clusterId,
+ final ReplicationPeer replicationPeer,
+ final MetricsSource metrics) {
+ this.peerConfig = peerConfig;
+ this.conf = conf;
+ this.fs = fs;
+ this.clusterId = clusterId;
+ this.peerId = peerId;
+ this.replicationPeer = replicationPeer;
+ this.metrics = metrics;
+ }
+ public Configuration getConfiguration() {
+ return conf;
+ }
+ public FileSystem getFilesystem() {
+ return fs;
+ }
+ public UUID getClusterId() {
+ return clusterId;
+ }
+ public String getPeerId() {
+ return peerId;
+ }
+ public ReplicationPeerConfig getPeerConfig() {
+ return peerConfig;
+ }
+ public ReplicationPeer getReplicationPeer() {
+ return replicationPeer;
+ }
+ public MetricsSource getMetrics() {
+ return metrics;
+ }
+ }
+
+ /**
+ * Initialize the replication endpoint with the given context.
+ * @param context replication context
+ * @throws IOException
+ */
+ void init(Context context) throws IOException;
+
+ /** Whether or not, the replication endpoint can replicate to it's source cluster with the same
+ * UUID */
+ boolean canReplicateToSameCluster();
+
+ /**
+ * Returns a UUID of the provided peer id. Every HBase cluster instance has a persisted
+ * associated UUID. If the replication is not performed to an actual HBase cluster (but
+ * some other system), the UUID returned has to uniquely identify the connected target system.
+ * @return a UUID or null if the peer cluster does not exist or is not connected.
+ */
+ UUID getPeerUUID();
+
+ /**
+ * Returns a WALEntryFilter to use for filtering out WALEntries from the log. Replication
+ * infrastructure will call this filter before sending the edits to shipEdits().
+ * @return a {@link WALEntryFilter} or null.
+ */
+ WALEntryFilter getWALEntryfilter();
+
+ /**
+ * A context for {@link ReplicationEndpoint#replicate(ReplicateContext)} method.
+ */
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+ class ReplicateContext {
+ List<HLog.Entry> entries;
+ int size;
+ @InterfaceAudience.Private
+ public ReplicateContext() {
+ }
+
+ public ReplicateContext setEntries(List<HLog.Entry> entries) {
+ this.entries = entries;
+ return this;
+ }
+ public ReplicateContext setSize(int size) {
+ this.size = size;
+ return this;
+ }
+ public List<HLog.Entry> getEntries() {
+ return entries;
+ }
+ public int getSize() {
+ return size;
+ }
+ }
+
+ /**
+ * Replicate the given set of entries (in the context) to the other cluster.
+ * Can block until all the given entries are replicated. Upon this method is returned,
+ * all entries that were passed in the context are assumed to be persisted in the
+ * target cluster.
+ * @param replicateContext a context where WAL entries and other
+ * parameters can be obtained.
+ */
+ boolean replicate(ReplicateContext replicateContext);
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
new file mode 100644
index 0000000..82d976d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+
+/**
+ * Keeps KVs that are scoped other than local
+ */
+@InterfaceAudience.Private
+public class ScopeWALEntryFilter implements WALEntryFilter {
+
+ @Override
+ public Entry filter(Entry entry) {
+ NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
+ if (scopes == null || scopes.isEmpty()) {
+ return null;
+ }
+ ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
+ int size = kvs.size();
+ for (int i = size - 1; i >= 0; i--) {
+ KeyValue kv = kvs.get(i);
+ // The scope will be null or empty if
+ // there's nothing to replicate in that WALEdit
+ if (!scopes.containsKey(kv.getFamily())
+ || scopes.get(kv.getFamily()) == HConstants.REPLICATION_SCOPE_LOCAL) {
+ kvs.remove(i);
+ }
+ }
+ if (kvs.size() < size / 2) {
+ kvs.trimToSize();
+ }
+ return entry;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
new file mode 100644
index 0000000..e84ac18
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+
+/**
+ * Skips WAL edits for all System tables including META
+ */
+@InterfaceAudience.Private
+public class SystemTableWALEntryFilter implements WALEntryFilter {
+ @Override
+ public Entry filter(Entry entry) {
+ if (entry.getKey().getTablename().isSystemTable()) {
+ return null;
+ }
+ return entry;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
new file mode 100644
index 0000000..b7dfc4e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class TableCfWALEntryFilter implements WALEntryFilter {
+
+ private static final Log LOG = LogFactory.getLog(TableCfWALEntryFilter.class);
+ private final ReplicationPeer peer;
+
+ public TableCfWALEntryFilter(ReplicationPeer peer) {
+ this.peer = peer;
+ }
+
+ @Override
+ public Entry filter(Entry entry) {
+ String tabName = entry.getKey().getTablename().getNameAsString();
+ ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
+ Map<String, List<String>> tableCFs = null;
+
+ try {
+ tableCFs = this.peer.getTableCFs();
+ } catch (IllegalArgumentException e) {
+ LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
+ ", degenerate as if it's not configured by keeping tableCFs==null");
+ }
+ int size = kvs.size();
+
+ // return null(prevent replicating) if logKey's table isn't in this peer's
+ // replicable table list (empty tableCFs means all table are replicable)
+ if (tableCFs != null && !tableCFs.containsKey(tabName)) {
+ return null;
+ } else {
+ List<String> cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
+ for (int i = size - 1; i >= 0; i--) {
+ KeyValue kv = kvs.get(i);
+ // ignore(remove) kv if its cf isn't in the replicable cf list
+ // (empty cfs means all cfs of this table are replicable)
+ if ((cfs != null && !cfs.contains(Bytes.toString(kv.getFamily())))) {
+ kvs.remove(i);
+ }
+ }
+ }
+ if (kvs.size() < size/2) {
+ kvs.trimToSize();
+ }
+ return entry;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
new file mode 100644
index 0000000..c054978
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+
+/**
+ * A Filter for WAL entries before being sent over to replication. Multiple
+ * filters might be chained together using {@link ChainWALEntryFilter}.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface WALEntryFilter {
+
+ /**
+ * Applies the filter, possibly returning a different HLog.Entry instance.
+ * If null is returned, the entry will be skipped.
+ * @param entry WAL Entry to filter
+ * @return a (possibly modified) HLog.Entry to use. Returning null or an entry with
+ * no cells will cause the entry to be skipped for replication.
+ */
+ public HLog.Entry filter(HLog.Entry entry);
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index cea5750..be69d98 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -25,14 +25,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-
import java.io.IOException;
import java.util.List;
import java.util.Set;
@@ -57,12 +54,12 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
@Override
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
- // all members of this class are null if replication is disabled,
+ // all members of this class are null if replication is disabled,
// so we cannot filter the files
if (this.getConf() == null) {
return files;
}
-
+
final Set<String> hlogs = loadHLogsFromQueues();
return Iterables.filter(files, new Predicate<FileStatus>() {
@Override
@@ -137,8 +134,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
LOG.info("Stopping " + this.zkw);
this.zkw.close();
}
- // Not sure why we're deleting a connection that we never acquired or used
- HConnectionManager.deleteConnection(this.getConf());
}
@Override
[3/3] git commit: HBASE-11367 Pluggable replication endpoint
Posted by en...@apache.org.
HBASE-11367 Pluggable replication endpoint
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/463d52d8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/463d52d8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/463d52d8
Branch: refs/heads/master
Commit: 463d52d8cf2a87e1f11eb6fabcd0164584e29fbb
Parents: 4824b0d
Author: Enis Soztutar <en...@apache.org>
Authored: Mon Jul 14 16:21:55 2014 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Mon Jul 14 16:22:26 2014 -0700
----------------------------------------------------------------------
.../client/replication/ReplicationAdmin.java | 109 ++-
.../hbase/replication/ReplicationPeer.java | 353 +------
.../replication/ReplicationPeerConfig.java | 87 ++
.../replication/ReplicationPeerZKImpl.java | 320 ++++++
.../hbase/replication/ReplicationPeers.java | 89 +-
.../replication/ReplicationPeersZKImpl.java | 348 +++----
.../hadoop/hbase/zookeeper/ZKClusterId.java | 3 +-
.../hadoop/hbase/HBaseInterfaceAudience.java | 1 +
.../protobuf/generated/ZooKeeperProtos.java | 963 ++++++++++++++++++-
.../src/main/protobuf/ZooKeeper.proto | 3 +
.../replication/VerifyReplication.java | 13 +-
.../hadoop/hbase/regionserver/wal/HLog.java | 9 +-
.../hadoop/hbase/regionserver/wal/HLogKey.java | 11 +-
.../hadoop/hbase/regionserver/wal/WALEdit.java | 5 +-
.../replication/BaseReplicationEndpoint.java | 77 ++
.../hbase/replication/ChainWALEntryFilter.java | 68 ++
.../replication/HBaseReplicationEndpoint.java | 217 +++++
.../hbase/replication/ReplicationEndpoint.java | 163 ++++
.../hbase/replication/ScopeWALEntryFilter.java | 58 ++
.../replication/SystemTableWALEntryFilter.java | 36 +
.../replication/TableCfWALEntryFilter.java | 74 ++
.../hbase/replication/WALEntryFilter.java | 41 +
.../master/ReplicationLogCleaner.java | 9 +-
.../HBaseInterClusterReplicationEndpoint.java | 225 +++++
.../replication/regionserver/MetricsSource.java | 5 +-
.../regionserver/ReplicationSinkManager.java | 17 +-
.../regionserver/ReplicationSource.java | 279 +++---
.../ReplicationSourceInterface.java | 4 +-
.../regionserver/ReplicationSourceManager.java | 68 +-
.../replication/TestReplicationAdmin.java | 37 +
.../replication/ReplicationSourceDummy.java | 4 +-
.../replication/TestPerTableCFReplication.java | 20 +-
.../replication/TestReplicationEndpoint.java | 272 ++++++
.../replication/TestReplicationStateBasic.java | 58 +-
.../TestReplicationTrackerZKImpl.java | 4 +-
.../TestReplicationWALEntryFilters.java | 277 ++++++
.../TestReplicationSinkManager.java | 17 +-
37 files changed, 3464 insertions(+), 880 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 72e3fbb..4028d87 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -21,10 +21,12 @@ package org.apache.hadoop.hbase.client.replication;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
+import java.util.Map.Entry;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -34,14 +36,18 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* <p>
* This class provides the administrative interface to HBase cluster
@@ -80,6 +86,8 @@ public class ReplicationAdmin implements Closeable {
.toString(HConstants.REPLICATION_SCOPE_GLOBAL);
private final HConnection connection;
+ // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
+ // be moved to hbase-server. Resolve it in HBASE-11392.
private final ReplicationQueuesClient replicationQueuesClient;
private final ReplicationPeers replicationPeers;
@@ -126,27 +134,65 @@ public class ReplicationAdmin implements Closeable {
});
}
-
/**
* Add a new peer cluster to replicate to.
- * @param id a short that identifies the cluster
+ * @param id a short name that identifies the cluster
* @param clusterKey the concatenation of the slave cluster's
* <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
* @throws IllegalStateException if there's already one slave since
* multi-slave isn't supported yet.
+ * @deprecated Use addPeer(String, ReplicationPeerConfig, Map) instead.
*/
+ @Deprecated
public void addPeer(String id, String clusterKey) throws ReplicationException {
- this.replicationPeers.addPeer(id, clusterKey);
+ this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
}
+ @Deprecated
public void addPeer(String id, String clusterKey, String tableCFs)
throws ReplicationException {
- this.replicationPeers.addPeer(id, clusterKey, tableCFs);
+ this.replicationPeers.addPeer(id,
+ new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
+ }
+
+ /**
+ * Add a new remote slave cluster for replication.
+ * @param id a short name that identifies the cluster
+ * @param peerConfig configuration for the replication slave cluster
+ * @param tableCfs the table and column-family list which will be replicated for this peer.
+ * A map from tableName to column family names. An empty collection can be passed
+ * to indicate replicating all column families. Pass null for replicating all table and column
+ * families
+ */
+ public void addPeer(String id, ReplicationPeerConfig peerConfig,
+ Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
+ this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
+ }
+
+ @VisibleForTesting
+ static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
+ String tableCfsStr = null;
+ if (tableCfs != null) {
+ // Format: table1:cf1,cf2;table2:cfA,cfB;table3
+ StringBuilder builder = new StringBuilder();
+ for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
+ if (builder.length() > 0) {
+ builder.append(";");
+ }
+ builder.append(entry.getKey());
+ if (entry.getValue() != null && !entry.getValue().isEmpty()) {
+ builder.append(":");
+ builder.append(StringUtils.join(entry.getValue(), ","));
+ }
+ }
+ tableCfsStr = builder.toString();
+ }
+ return tableCfsStr;
}
/**
* Removes a peer cluster and stops the replication to it.
- * @param id a short that identifies the cluster
+ * @param id a short name that identifies the cluster
*/
public void removePeer(String id) throws ReplicationException {
this.replicationPeers.removePeer(id);
@@ -154,7 +200,7 @@ public class ReplicationAdmin implements Closeable {
/**
* Restart the replication stream to the specified peer.
- * @param id a short that identifies the cluster
+ * @param id a short name that identifies the cluster
*/
public void enablePeer(String id) throws ReplicationException {
this.replicationPeers.enablePeer(id);
@@ -162,7 +208,7 @@ public class ReplicationAdmin implements Closeable {
/**
* Stop the replication stream to the specified peer.
- * @param id a short that identifies the cluster
+ * @param id a short name that identifies the cluster
*/
public void disablePeer(String id) throws ReplicationException {
this.replicationPeers.disablePeer(id);
@@ -179,14 +225,30 @@ public class ReplicationAdmin implements Closeable {
/**
* Map of this cluster's peers for display.
* @return A map of peer ids to peer cluster keys
+ * @deprecated use {@link #listPeerConfigs()}
*/
+ @Deprecated
public Map<String, String> listPeers() {
- return this.replicationPeers.getAllPeerClusterKeys();
+ Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
+ Map<String, String> ret = new HashMap<String, String>(peers.size());
+
+ for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
+ ret.put(entry.getKey(), entry.getValue().getClusterKey());
+ }
+ return ret;
+ }
+
+ public Map<String, ReplicationPeerConfig> listPeerConfigs() {
+ return this.replicationPeers.getAllPeerConfigs();
+ }
+
+ public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
+ return this.replicationPeers.getReplicationPeerConfig(id);
}
/**
* Get the replicable table-cf config of the specified peer.
- * @param id a short that identifies the cluster
+ * @param id a short name that identifies the cluster
*/
public String getPeerTableCFs(String id) throws ReplicationException {
return this.replicationPeers.getPeerTableCFsConfig(id);
@@ -194,16 +256,31 @@ public class ReplicationAdmin implements Closeable {
/**
* Set the replicable table-cf config of the specified peer
- * @param id a short that identifies the cluster
+ * @param id a short name that identifies the cluster
+ * @deprecated use {@link #setPeerTableCFs(String, Map)}
*/
+ @Deprecated
public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
}
/**
+ * Set the replicable table-cf config of the specified peer
+ * @param id a short name that identifies the cluster
+ * @param tableCfs the table and column-family list which will be replicated for this peer.
+ * A map from tableName to column family names. An empty collection can be passed
+ * to indicate replicating all column families. Pass null for replicating all table and column
+ * families
+ */
+ public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
+ throws ReplicationException {
+ this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
+ }
+
+ /**
* Get the state of the specified peer cluster
- * @param id String format of the Short that identifies the peer, an IllegalArgumentException
- * is thrown if it doesn't exist
+ * @param id String format of the Short name that identifies the peer,
+ * an IllegalArgumentException is thrown if it doesn't exist
* @return true if replication is enabled to that peer, false if it isn't
*/
public boolean getPeerState(String id) throws ReplicationException {
@@ -217,7 +294,7 @@ public class ReplicationAdmin implements Closeable {
}
}
-
+
/**
* Find all column families that are replicated from this cluster
* @return the full list of the replicated column families of this cluster as:
@@ -227,7 +304,7 @@ public class ReplicationAdmin implements Closeable {
* types may be extended here. For example
* 1) the replication may only apply to selected peers instead of all peers
* 2) the replicationType may indicate the host Cluster servers as Slave
- * for the table:columnFam.
+ * for the table:columnFam.
*/
public List<HashMap<String, String>> listReplicated() throws IOException {
List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
@@ -249,5 +326,5 @@ public class ReplicationAdmin implements Closeable {
}
return replicationColFams;
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index 1b14dab..c116674 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,362 +17,56 @@
*/
package org.apache.hadoop.hbase.replication;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-
-import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
/**
- * This class acts as a wrapper for all the objects used to identify and
- * communicate with remote peers and is responsible for answering to expired
- * sessions and re-establishing the ZK connections.
+ * ReplicationPeer manages enabled / disabled state for the peer.
*/
-@InterfaceAudience.Private
-public class ReplicationPeer implements Abortable, Closeable {
- private static final Log LOG = LogFactory.getLog(ReplicationPeer.class);
-
- private final String clusterKey;
- private final String id;
- private List<ServerName> regionServers = new ArrayList<ServerName>(0);
- private final AtomicBoolean peerEnabled = new AtomicBoolean();
- private volatile Map<String, List<String>> tableCFs = new HashMap<String, List<String>>();
- // Cannot be final since a new object needs to be recreated when session fails
- private ZooKeeperWatcher zkw;
- private final Configuration conf;
- private long lastRegionserverUpdate;
-
- private PeerStateTracker peerStateTracker;
- private TableCFsTracker tableCFsTracker;
-
- /**
- * Constructor that takes all the objects required to communicate with the
- * specified peer, except for the region server addresses.
- * @param conf configuration object to this peer
- * @param key cluster key used to locate the peer
- * @param id string representation of this peer's identifier
- */
- public ReplicationPeer(Configuration conf, String key, String id) throws ReplicationException {
- this.conf = conf;
- this.clusterKey = key;
- this.id = id;
- try {
- this.reloadZkWatcher();
- } catch (IOException e) {
- throw new ReplicationException("Error connecting to peer cluster with peerId=" + id, e);
- }
- }
-
- /**
- * start a state tracker to check whether this peer is enabled or not
- *
- * @param zookeeper zk watcher for the local cluster
- * @param peerStateNode path to zk node which stores peer state
- * @throws KeeperException
- */
- public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
- throws KeeperException {
- ensurePeerEnabled(zookeeper, peerStateNode);
- this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
- this.peerStateTracker.start();
- try {
- this.readPeerStateZnode();
- } catch (DeserializationException e) {
- throw ZKUtil.convert(e);
- }
- }
-
- private void readPeerStateZnode() throws DeserializationException {
- this.peerEnabled.set(isStateEnabled(this.peerStateTracker.getData(false)));
- }
-
- /**
- * start a table-cfs tracker to listen the (table, cf-list) map change
- *
- * @param zookeeper zk watcher for the local cluster
- * @param tableCFsNode path to zk node which stores table-cfs
- * @throws KeeperException
- */
- public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
- throws KeeperException {
- this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
- this);
- this.tableCFsTracker.start();
- this.readTableCFsZnode();
- }
-
- static Map<String, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
- if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
- return null;
- }
-
- Map<String, List<String>> tableCFsMap = null;
-
- // parse out (table, cf-list) pairs from tableCFsConfig
- // format: "table1:cf1,cf2;table2:cfA,cfB"
- String[] tables = tableCFsConfig.split(";");
- for (String tab : tables) {
- // 1 ignore empty table config
- tab = tab.trim();
- if (tab.length() == 0) {
- continue;
- }
- // 2 split to "table" and "cf1,cf2"
- // for each table: "table:cf1,cf2" or "table"
- String[] pair = tab.split(":");
- String tabName = pair[0].trim();
- if (pair.length > 2 || tabName.length() == 0) {
- LOG.error("ignore invalid tableCFs setting: " + tab);
- continue;
- }
-
- // 3 parse "cf1,cf2" part to List<cf>
- List<String> cfs = null;
- if (pair.length == 2) {
- String[] cfsList = pair[1].split(",");
- for (String cf : cfsList) {
- String cfName = cf.trim();
- if (cfName.length() > 0) {
- if (cfs == null) {
- cfs = new ArrayList<String>();
- }
- cfs.add(cfName);
- }
- }
- }
-
- // 4 put <table, List<cf>> to map
- if (tableCFsMap == null) {
- tableCFsMap = new HashMap<String, List<String>>();
- }
- tableCFsMap.put(tabName, cfs);
- }
-
- return tableCFsMap;
- }
-
- private void readTableCFsZnode() {
- String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
- this.tableCFs = parseTableCFsFromConfig(currentTableCFs);
- }
-
- /**
- * Get the cluster key of that peer
- * @return string consisting of zk ensemble addresses, client port
- * and root znode
- */
- public String getClusterKey() {
- return clusterKey;
- }
-
- /**
- * Get the state of this peer
- * @return atomic boolean that holds the status
- */
- public AtomicBoolean getPeerEnabled() {
- return peerEnabled;
- }
-
- /**
- * Get replicable (table, cf-list) map of this peer
- * @return the replicable (table, cf-list) map
- */
- public Map<String, List<String>> getTableCFs() {
- return this.tableCFs;
- }
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface ReplicationPeer {
/**
- * Get a list of all the addresses of all the region servers
- * for this peer cluster
- * @return list of addresses
+ * State of the peer, whether it is enabled or not
*/
- public List<ServerName> getRegionServers() {
- return regionServers;
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+ enum PeerState {
+ ENABLED,
+ DISABLED
}
/**
- * Set the list of region servers for that peer
- * @param regionServers list of addresses for the region servers
+ * Get the identifier of this peer
+ * @return string representation of the id
*/
- public void setRegionServers(List<ServerName> regionServers) {
- this.regionServers = regionServers;
- lastRegionserverUpdate = System.currentTimeMillis();
- }
+ String getId();
/**
- * Get the ZK connection to this peer
- * @return zk connection
+ * Get the peer config object
+ * @return the ReplicationPeerConfig for this peer
*/
- public ZooKeeperWatcher getZkw() {
- return zkw;
- }
+ public ReplicationPeerConfig getPeerConfig();
/**
- * Get the timestamp at which the last change occurred to the list of region servers to replicate
- * to.
- * @return The System.currentTimeMillis at the last time the list of peer region servers changed.
+ * Returns the state of the peer
+ * @return the enabled state
*/
- public long getLastRegionserverUpdate() {
- return lastRegionserverUpdate;
- }
-
- /**
- * Get the identifier of this peer
- * @return string representation of the id (short)
- */
- public String getId() {
- return id;
- }
+ PeerState getPeerState();
/**
* Get the configuration object required to communicate with this peer
* @return configuration object
*/
- public Configuration getConfiguration() {
- return conf;
- }
-
- @Override
- public void abort(String why, Throwable e) {
- LOG.fatal("The ReplicationPeer coresponding to peer " + clusterKey
- + " was aborted for the following reason(s):" + why, e);
- }
-
- /**
- * Closes the current ZKW (if not null) and creates a new one
- * @throws IOException If anything goes wrong connecting
- */
- public void reloadZkWatcher() throws IOException {
- if (zkw != null) zkw.close();
- zkw = new ZooKeeperWatcher(conf,
- "connection to cluster: " + id, this);
- }
-
- @Override
- public boolean isAborted() {
- // Currently the replication peer is never "Aborted", we just log when the
- // abort method is called.
- return false;
- }
-
- @Override
- public void close() throws IOException {
- if (zkw != null){
- zkw.close();
- }
- }
-
- /**
- * Parse the raw data from ZK to get a peer's state
- * @param bytes raw ZK data
- * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
- * @throws DeserializationException
- */
- public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
- ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
- return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
- }
-
- /**
- * @param bytes Content of a state znode.
- * @return State parsed from the passed bytes.
- * @throws DeserializationException
- */
- private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
- throws DeserializationException {
- ProtobufUtil.expectPBMagicPrefix(bytes);
- int pblen = ProtobufUtil.lengthOfPBMagic();
- ZooKeeperProtos.ReplicationState.Builder builder =
- ZooKeeperProtos.ReplicationState.newBuilder();
- ZooKeeperProtos.ReplicationState state;
- try {
- state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
- return state.getState();
- } catch (InvalidProtocolBufferException e) {
- throw new DeserializationException(e);
- }
- }
-
- /**
- * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
- * @param zookeeper
- * @param path Path to znode to check
- * @return True if we created the znode.
- * @throws NodeExistsException
- * @throws KeeperException
- */
- private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
- throws NodeExistsException, KeeperException {
- if (ZKUtil.checkExists(zookeeper, path) == -1) {
- // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
- // peer-state znode. This happens while adding a peer.
- // The peer state data is set as "ENABLED" by default.
- ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
- ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
- return true;
- }
- return false;
- }
+ public Configuration getConfiguration();
/**
- * Tracker for state of this peer
- */
- public class PeerStateTracker extends ZooKeeperNodeTracker {
-
- public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
- Abortable abortable) {
- super(watcher, peerStateZNode, abortable);
- }
-
- @Override
- public synchronized void nodeDataChanged(String path) {
- if (path.equals(node)) {
- super.nodeDataChanged(path);
- try {
- readPeerStateZnode();
- } catch (DeserializationException e) {
- LOG.warn("Failed deserializing the content of " + path, e);
- }
- }
- }
- }
-
- /**
- * Tracker for (table, cf-list) map of this peer
+ * Get replicable (table, cf-list) map of this peer
+ * @return the replicable (table, cf-list) map
*/
- public class TableCFsTracker extends ZooKeeperNodeTracker {
+ public Map<String, List<String>> getTableCFs();
- public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
- Abortable abortable) {
- super(watcher, tableCFsZNode, abortable);
- }
-
- @Override
- public synchronized void nodeDataChanged(String path) {
- if (path.equals(node)) {
- super.nodeDataChanged(path);
- readTableCFsZnode();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
new file mode 100644
index 0000000..8b8bab7
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A configuration for the replication peer cluster.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ReplicationPeerConfig {
+
+ private String clusterKey;
+ private String replicationEndpointImpl;
+ private final Map<byte[], byte[]> peerData;
+ private final Map<String, String> configuration;
+
+
+ public ReplicationPeerConfig() {
+ this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
+ this.configuration = new HashMap<String, String>(0);
+ }
+
+ /**
+ * Set the clusterKey which is the concatenation of the slave cluster's:
+ * hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
+ */
+ public ReplicationPeerConfig setClusterKey(String clusterKey) {
+ this.clusterKey = clusterKey;
+ return this;
+ }
+
+ /**
+ * Sets the ReplicationEndpoint plugin class for this peer.
+ * @param replicationEndpointImpl a class implementing ReplicationEndpoint
+ */
+ public ReplicationPeerConfig setReplicationEndpointImpl(String replicationEndpointImpl) {
+ this.replicationEndpointImpl = replicationEndpointImpl;
+ return this;
+ }
+
+ public String getClusterKey() {
+ return clusterKey;
+ }
+
+ public String getReplicationEndpointImpl() {
+ return replicationEndpointImpl;
+ }
+
+ public Map<byte[], byte[]> getPeerData() {
+ return peerData;
+ }
+
+ public Map<String, String> getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
+ builder.append("replicationEndpointImpl=").append(replicationEndpointImpl);
+ return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
new file mode 100644
index 0000000..a39392c
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -0,0 +1,320 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+@InterfaceAudience.Private
+public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closeable {
+ private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
+
+ private final ReplicationPeerConfig peerConfig;
+ private final String id;
+ private volatile PeerState peerState;
+ private volatile Map<String, List<String>> tableCFs = new HashMap<String, List<String>>();
+ private final Configuration conf;
+
+ private PeerStateTracker peerStateTracker;
+ private TableCFsTracker tableCFsTracker;
+
+ /**
+ * Constructor that takes all the objects required to communicate with the
+ * specified peer, except for the region server addresses.
+ * @param conf configuration object to this peer
+ * @param id string representation of this peer's identifier
+ * @param peerConfig configuration for the replication peer
+ */
+ public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig)
+ throws ReplicationException {
+ this.conf = conf;
+ this.peerConfig = peerConfig;
+ this.id = id;
+ }
+
+ /**
+ * start a state tracker to check whether this peer is enabled or not
+ *
+ * @param zookeeper zk watcher for the local cluster
+ * @param peerStateNode path to zk node which stores peer state
+ * @throws KeeperException
+ */
+ public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
+ throws KeeperException {
+ ensurePeerEnabled(zookeeper, peerStateNode);
+ this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
+ this.peerStateTracker.start();
+ try {
+ this.readPeerStateZnode();
+ } catch (DeserializationException e) {
+ throw ZKUtil.convert(e);
+ }
+ }
+
+ private void readPeerStateZnode() throws DeserializationException {
+ this.peerState =
+ isStateEnabled(this.peerStateTracker.getData(false))
+ ? PeerState.ENABLED
+ : PeerState.DISABLED;
+ }
+
+ /**
+ * start a table-cfs tracker to listen the (table, cf-list) map change
+ *
+ * @param zookeeper zk watcher for the local cluster
+ * @param tableCFsNode path to zk node which stores table-cfs
+ * @throws KeeperException
+ */
+ public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
+ throws KeeperException {
+ this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
+ this);
+ this.tableCFsTracker.start();
+ this.readTableCFsZnode();
+ }
+
+ static Map<String, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
+ if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
+ return null;
+ }
+
+ Map<String, List<String>> tableCFsMap = null;
+ // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
+ // parse out (table, cf-list) pairs from tableCFsConfig
+ // format: "table1:cf1,cf2;table2:cfA,cfB"
+ String[] tables = tableCFsConfig.split(";");
+ for (String tab : tables) {
+ // 1 ignore empty table config
+ tab = tab.trim();
+ if (tab.length() == 0) {
+ continue;
+ }
+ // 2 split to "table" and "cf1,cf2"
+ // for each table: "table:cf1,cf2" or "table"
+ String[] pair = tab.split(":");
+ String tabName = pair[0].trim();
+ if (pair.length > 2 || tabName.length() == 0) {
+ LOG.error("ignore invalid tableCFs setting: " + tab);
+ continue;
+ }
+
+ // 3 parse "cf1,cf2" part to List<cf>
+ List<String> cfs = null;
+ if (pair.length == 2) {
+ String[] cfsList = pair[1].split(",");
+ for (String cf : cfsList) {
+ String cfName = cf.trim();
+ if (cfName.length() > 0) {
+ if (cfs == null) {
+ cfs = new ArrayList<String>();
+ }
+ cfs.add(cfName);
+ }
+ }
+ }
+
+ // 4 put <table, List<cf>> to map
+ if (tableCFsMap == null) {
+ tableCFsMap = new HashMap<String, List<String>>();
+ }
+ tableCFsMap.put(tabName, cfs);
+ }
+
+ return tableCFsMap;
+ }
+
+ private void readTableCFsZnode() {
+ String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
+ this.tableCFs = parseTableCFsFromConfig(currentTableCFs);
+ }
+
+ @Override
+ public PeerState getPeerState() {
+ return peerState;
+ }
+
+ /**
+ * Get the identifier of this peer
+ * @return string representation of the id (short)
+ */
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * Get the peer config object
+ * @return the ReplicationPeerConfig for this peer
+ */
+ @Override
+ public ReplicationPeerConfig getPeerConfig() {
+ return peerConfig;
+ }
+
+ /**
+ * Get the configuration object required to communicate with this peer
+ * @return configuration object
+ */
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ /**
+ * Get replicable (table, cf-list) map of this peer
+ * @return the replicable (table, cf-list) map
+ */
+ @Override
+ public Map<String, List<String>> getTableCFs() {
+ return this.tableCFs;
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.fatal("The ReplicationPeer coresponding to peer " + peerConfig
+ + " was aborted for the following reason(s):" + why, e);
+ }
+
+ @Override
+ public boolean isAborted() {
+ // Currently the replication peer is never "Aborted", we just log when the
+ // abort method is called.
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // TODO: stop zkw?
+ }
+
+ /**
+ * Parse the raw data from ZK to get a peer's state
+ * @param bytes raw ZK data
+ * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
+ * @throws DeserializationException
+ */
+ public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
+ ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
+ return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
+ }
+
+ /**
+ * @param bytes Content of a state znode.
+ * @return State parsed from the passed bytes.
+ * @throws DeserializationException
+ */
+ private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
+ throws DeserializationException {
+ ProtobufUtil.expectPBMagicPrefix(bytes);
+ int pblen = ProtobufUtil.lengthOfPBMagic();
+ ZooKeeperProtos.ReplicationState.Builder builder =
+ ZooKeeperProtos.ReplicationState.newBuilder();
+ ZooKeeperProtos.ReplicationState state;
+ try {
+ state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+ return state.getState();
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException(e);
+ }
+ }
+
+ /**
+ * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
+ * @param zookeeper
+ * @param path Path to znode to check
+ * @return True if we created the znode.
+ * @throws NodeExistsException
+ * @throws KeeperException
+ */
+ private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
+ throws NodeExistsException, KeeperException {
+ if (ZKUtil.checkExists(zookeeper, path) == -1) {
+ // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
+ // peer-state znode. This happens while adding a peer.
+ // The peer state data is set as "ENABLED" by default.
+ ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
+ ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Tracker for state of this peer
+ */
+ public class PeerStateTracker extends ZooKeeperNodeTracker {
+
+ public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
+ Abortable abortable) {
+ super(watcher, peerStateZNode, abortable);
+ }
+
+ @Override
+ public synchronized void nodeDataChanged(String path) {
+ if (path.equals(node)) {
+ super.nodeDataChanged(path);
+ try {
+ readPeerStateZnode();
+ } catch (DeserializationException e) {
+ LOG.warn("Failed deserializing the content of " + path, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Tracker for (table, cf-list) map of this peer
+ */
+ public class TableCFsTracker extends ZooKeeperNodeTracker {
+
+ public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
+ Abortable abortable) {
+ super(watcher, tableCFsZNode, abortable);
+ }
+
+ @Override
+ public synchronized void nodeDataChanged(String path) {
+ if (path.equals(node)) {
+ super.nodeDataChanged(path);
+ readTableCFsZnode();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 4922f70..b1c3b49 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -21,11 +21,10 @@ package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.Pair;
/**
* This provides an interface for maintaining a set of peer clusters. These peers are remote slave
@@ -44,22 +43,16 @@ public interface ReplicationPeers {
* Initialize the ReplicationPeers interface.
*/
void init() throws ReplicationException;
- /**
- * Add a new remote slave cluster for replication.
- * @param peerId a short that identifies the cluster
- * @param clusterKey the concatenation of the slave cluster's:
- * hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
- */
- void addPeer(String peerId, String clusterKey) throws ReplicationException;
/**
* Add a new remote slave cluster for replication.
* @param peerId a short that identifies the cluster
- * @param clusterKey the concatenation of the slave cluster's:
- * hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
- * @param tableCFs the table and column-family list which will be replicated for this peer
+ * @param peerConfig configuration for the replication slave cluster
+ * @param tableCFs the table and column-family list which will be replicated for this peer or null
+ * for all table and column families
*/
- void addPeer(String peerId, String clusterKey, String tableCFs) throws ReplicationException;
+ void addPeer(String peerId, ReplicationPeerConfig peerConfig, String tableCFs)
+ throws ReplicationException;
/**
* Removes a remote slave cluster and stops the replication to it.
@@ -67,6 +60,10 @@ public interface ReplicationPeers {
*/
void removePeer(String peerId) throws ReplicationException;
+ boolean peerAdded(String peerId) throws ReplicationException;
+
+ void peerRemoved(String peerId);
+
/**
* Restart the replication to the specified remote slave cluster.
* @param peerId a short that identifies the cluster
@@ -100,6 +97,19 @@ public interface ReplicationPeers {
public Map<String, List<String>> getTableCFs(String peerId);
/**
+ * Returns the ReplicationPeer
+ * @param peerId id for the peer
+ * @return ReplicationPeer object
+ */
+ ReplicationPeer getPeer(String peerId);
+
+ /**
+ * Returns the set of peerIds defined
+ * @return a Set of Strings for peerIds
+ */
+ public Set<String> getPeerIds();
+
+ /**
* Get the replication status for the specified connected remote slave cluster.
* The value might be read from cache, so it is recommended to
* use {@link #getStatusOfPeerFromBackingStore(String)}
@@ -107,7 +117,7 @@ public interface ReplicationPeers {
* @param peerId a short that identifies the cluster
* @return true if replication is enabled, false otherwise.
*/
- boolean getStatusOfConnectedPeer(String peerId);
+ boolean getStatusOfPeer(String peerId);
/**
* Get the replication status for the specified remote slave cluster, which doesn't
@@ -119,17 +129,11 @@ public interface ReplicationPeers {
boolean getStatusOfPeerFromBackingStore(String peerId) throws ReplicationException;
/**
- * Get a set of all connected remote slave clusters.
- * @return set of peer ids
- */
- Set<String> getConnectedPeers();
-
- /**
- * List the cluster keys of all remote slave clusters (whether they are enabled/disabled or
- * connected/disconnected).
+ * List the cluster replication configs of all remote slave clusters (whether they are
+ * enabled/disabled or connected/disconnected).
* @return A map of peer ids to peer cluster keys
*/
- Map<String, String> getAllPeerClusterKeys();
+ Map<String, ReplicationPeerConfig> getAllPeerConfigs();
/**
* List the peer ids of all remote slave clusters (whether they are enabled/disabled or
@@ -139,45 +143,16 @@ public interface ReplicationPeers {
List<String> getAllPeerIds();
/**
- * Attempt to connect to a new remote slave cluster.
- * @param peerId a short that identifies the cluster
- * @return true if a new connection was made, false if no new connection was made.
- */
- boolean connectToPeer(String peerId) throws ReplicationException;
-
- /**
- * Disconnect from a remote slave cluster.
- * @param peerId a short that identifies the cluster
- */
- void disconnectFromPeer(String peerId);
-
- /**
- * Returns all region servers from given connected remote slave cluster.
- * @param peerId a short that identifies the cluster
- * @return addresses of all region servers in the peer cluster. Returns an empty list if the peer
- * cluster is unavailable or there are no region servers in the cluster.
- */
- List<ServerName> getRegionServersOfConnectedPeer(String peerId);
-
- /**
- * Get the timestamp of the last change in composition of a given peer cluster.
- * @param peerId identifier of the peer cluster for which the timestamp is requested
- * @return the timestamp (in milliseconds) of the last change to the composition of
- * the peer cluster
- */
- long getTimestampOfLastChangeToPeer(String peerId);
-
- /**
- * Returns the UUID of the provided peer id.
- * @param peerId the peer's ID that will be converted into a UUID
- * @return a UUID or null if the peer cluster does not exist or is not connected.
+ * Returns the configured ReplicationPeerConfig for this peerId
+ * @param peerId a short name that identifies the cluster
+ * @return ReplicationPeerConfig for the peer
*/
- UUID getPeerUUID(String peerId);
+ ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException;
/**
* Returns the configuration needed to talk to the remote slave cluster.
* @param peerId a short that identifies the cluster
* @return the configuration for the peer cluster, null if it was unable to get the configuration
*/
- Configuration getPeerConf(String peerId) throws ReplicationException;
+ Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index fb09102..488d37a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -19,33 +19,29 @@
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.AuthFailedException;
-import org.apache.zookeeper.KeeperException.ConnectionLossException;
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
-
+import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
/**
@@ -77,7 +73,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
// Map of peer clusters keyed by their id
- private Map<String, ReplicationPeer> peerClusters;
+ private Map<String, ReplicationPeerZKImpl> peerClusters;
private final String tableCFsNodeName;
private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
@@ -86,7 +82,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
Abortable abortable) {
super(zk, conf, abortable);
this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
- this.peerClusters = new HashMap<String, ReplicationPeer>();
+ this.peerClusters = new HashMap<String, ReplicationPeerZKImpl>();
}
@Override
@@ -98,16 +94,12 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
} catch (KeeperException e) {
throw new ReplicationException("Could not initialize replication peers", e);
}
- connectExistingPeers();
- }
-
- @Override
- public void addPeer(String id, String clusterKey) throws ReplicationException {
- addPeer(id, clusterKey, null);
+ addExistingPeers();
}
@Override
- public void addPeer(String id, String clusterKey, String tableCFs) throws ReplicationException {
+ public void addPeer(String id, ReplicationPeerConfig peerConfig, String tableCFs)
+ throws ReplicationException {
try {
if (peerExists(id)) {
throw new IllegalArgumentException("Cannot add a peer with id=" + id
@@ -115,7 +107,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
- toByteArray(clusterKey));
+ toByteArray(peerConfig));
// There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
// peer-state znode. This happens while adding a peer.
// The peer state data is set as "ENABLED" by default.
@@ -128,7 +120,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
Bytes.toBytes(tableCFsStr));
} catch (KeeperException e) {
throw new ReplicationException("Could not add peer with id=" + id
- + ", clusterKey=" + clusterKey, e);
+ + ", peerConfif=>" + peerConfig, e);
}
}
@@ -202,11 +194,11 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
@Override
- public boolean getStatusOfConnectedPeer(String id) {
+ public boolean getStatusOfPeer(String id) {
if (!this.peerClusters.containsKey(id)) {
throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
}
- return this.peerClusters.get(id).getPeerEnabled().get();
+ return this.peerClusters.get(id).getPeerState() == PeerState.ENABLED;
}
@Override
@@ -217,7 +209,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
String peerStateZNode = getPeerStateNode(id);
try {
- return ReplicationPeer.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
+ return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
} catch (KeeperException e) {
throw new ReplicationException(e);
} catch (DeserializationException e) {
@@ -232,140 +224,98 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
@Override
- public boolean connectToPeer(String peerId) throws ReplicationException {
- if (peerClusters == null) {
- return false;
- }
- if (this.peerClusters.containsKey(peerId)) {
- return false;
- }
- ReplicationPeer peer = null;
- try {
- peer = getPeer(peerId);
- } catch (Exception e) {
- throw new ReplicationException("Error connecting to peer with id=" + peerId, e);
- }
- if (peer == null) {
- return false;
- }
- this.peerClusters.put(peerId, peer);
- LOG.info("Added new peer cluster " + peer.getClusterKey());
- return true;
- }
-
- @Override
- public void disconnectFromPeer(String peerId) {
- ReplicationPeer rp = this.peerClusters.get(peerId);
- if (rp != null) {
- rp.getZkw().close();
- this.peerClusters.remove(peerId);
- }
- }
-
- @Override
- public Map<String, String> getAllPeerClusterKeys() {
- Map<String, String> peers = new TreeMap<String, String>();
+ public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
+ Map<String, ReplicationPeerConfig> peers = new TreeMap<String, ReplicationPeerConfig>();
List<String> ids = null;
try {
ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
for (String id : ids) {
- byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
- String clusterKey = null;
- try {
- clusterKey = parsePeerFrom(bytes);
- } catch (DeserializationException de) {
- LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
+ ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
+ if (peerConfig == null) {
+ LOG.warn("Failed to get replication peer configuration of clusterid=" + id
+ + " znode content, continuing.");
continue;
}
- peers.put(id, clusterKey);
+ peers.put(id, peerConfig);
}
} catch (KeeperException e) {
this.abortable.abort("Cannot get the list of peers ", e);
- } catch (InterruptedException e) {
+ } catch (ReplicationException e) {
this.abortable.abort("Cannot get the list of peers ", e);
}
return peers;
}
@Override
- public List<ServerName> getRegionServersOfConnectedPeer(String peerId) {
- if (this.peerClusters.size() == 0) {
- return Collections.emptyList();
- }
- ReplicationPeer peer = this.peerClusters.get(peerId);
- if (peer == null) {
- return Collections.emptyList();
- }
- List<ServerName> addresses;
- try {
- addresses = fetchSlavesAddresses(peer.getZkw());
- } catch (KeeperException ke) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Fetch salves addresses failed.", ke);
- }
- reconnectPeer(ke, peer);
- addresses = Collections.emptyList();
- }
- peer.setRegionServers(addresses);
- return peer.getRegionServers();
+ public ReplicationPeer getPeer(String peerId) {
+ return peerClusters.get(peerId);
}
@Override
- public UUID getPeerUUID(String peerId) {
- ReplicationPeer peer = this.peerClusters.get(peerId);
- if (peer == null) {
- return null;
- }
- UUID peerUUID = null;
- try {
- peerUUID = ZKClusterId.getUUIDForCluster(peer.getZkw());
- } catch (KeeperException ke) {
- reconnectPeer(ke, peer);
- }
- return peerUUID;
- }
-
- @Override
- public Set<String> getConnectedPeers() {
- return this.peerClusters.keySet();
+ public Set<String> getPeerIds() {
+ return peerClusters.keySet(); // this is not thread-safe
}
+ /**
+ * Returns a ReplicationPeerConfig from the znode or null for the given peerId.
+ */
@Override
- public Configuration getPeerConf(String peerId) throws ReplicationException {
+ public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
+ throws ReplicationException {
String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
byte[] data = null;
try {
data = ZKUtil.getData(this.zookeeper, znode);
- } catch (KeeperException e) {
- throw new ReplicationException("Error getting configuration for peer with id="
- + peerId, e);
} catch (InterruptedException e) {
LOG.warn("Could not get configuration for peer because the thread " +
"was interrupted. peerId=" + peerId);
Thread.currentThread().interrupt();
return null;
+ } catch (KeeperException e) {
+ throw new ReplicationException("Error getting configuration for peer with id="
+ + peerId, e);
}
if (data == null) {
LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
return null;
}
- String otherClusterKey = "";
+
try {
- otherClusterKey = parsePeerFrom(data);
+ return parsePeerFrom(data);
} catch (DeserializationException e) {
LOG.warn("Failed to parse cluster key from peerId=" + peerId
+ ", specifically the content from the following znode: " + znode);
return null;
}
+ }
+
+ @Override
+ public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
+ throws ReplicationException {
+ ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
+
+ if (peerConfig == null) {
+ return null;
+ }
Configuration otherConf = new Configuration(this.conf);
try {
- ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
+ if (peerConfig.getClusterKey() != null && !peerConfig.getClusterKey().isEmpty()) {
+ ZKUtil.applyClusterKeyToConf(otherConf, peerConfig.getClusterKey());
+ }
} catch (IOException e) {
LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
return null;
}
- return otherConf;
+
+ if (!peerConfig.getConfiguration().isEmpty()) {
+ CompoundConfiguration compound = new CompoundConfiguration();
+ compound.add(otherConf);
+ compound.addStringMap(peerConfig.getConfiguration());
+ return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, compound);
+ }
+
+ return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, otherConf);
}
/**
@@ -382,19 +332,11 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
return ids;
}
- @Override
- public long getTimestampOfLastChangeToPeer(String peerId) {
- if (!peerClusters.containsKey(peerId)) {
- throw new IllegalArgumentException("Unknown peer id: " + peerId);
- }
- return peerClusters.get(peerId).getLastRegionserverUpdate();
- }
-
/**
- * A private method used during initialization. This method attempts to connect to all registered
+ * A private method used during initialization. This method attempts to add all registered
* peer clusters. This method does not set a watch on the peer cluster znodes.
*/
- private void connectExistingPeers() throws ReplicationException {
+ private void addExistingPeers() throws ReplicationException {
List<String> znodes = null;
try {
znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
@@ -403,45 +345,49 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
if (znodes != null) {
for (String z : znodes) {
- connectToPeer(z);
+ createAndAddPeer(z);
}
}
}
- /**
- * A private method used to re-establish a zookeeper session with a peer cluster.
- * @param ke
- * @param peer
- */
- private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
- if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
- || ke instanceof AuthFailedException) {
- LOG.warn("Lost the ZooKeeper connection for peer " + peer.getClusterKey(), ke);
- try {
- peer.reloadZkWatcher();
- peer.getZkw().registerListener(new PeerRegionServerListener(peer));
- } catch (IOException io) {
- LOG.warn("Creation of ZookeeperWatcher failed for peer " + peer.getClusterKey(), io);
- }
+ @Override
+ public boolean peerAdded(String peerId) throws ReplicationException {
+ return createAndAddPeer(peerId);
+ }
+
+ @Override
+ public void peerRemoved(String peerId) {
+ ReplicationPeer rp = this.peerClusters.get(peerId);
+ if (rp != null) {
+ this.peerClusters.remove(peerId);
}
}
/**
- * Get the list of all the region servers from the specified peer
- * @param zkw zk connection to use
- * @return list of region server addresses or an empty list if the slave is unavailable
+ * Attempt to connect to a new remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ * @return true if a new connection was made, false if no new connection was made.
*/
- private static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
- throws KeeperException {
- List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode);
- if (children == null) {
- return Collections.emptyList();
+ public boolean createAndAddPeer(String peerId) throws ReplicationException {
+ if (peerClusters == null) {
+ return false;
}
- List<ServerName> addresses = new ArrayList<ServerName>(children.size());
- for (String child : children) {
- addresses.add(ServerName.parseServerName(child));
+ if (this.peerClusters.containsKey(peerId)) {
+ return false;
+ }
+
+ ReplicationPeerZKImpl peer = null;
+ try {
+ peer = createPeer(peerId);
+ } catch (Exception e) {
+ throw new ReplicationException("Error adding peer with id=" + peerId, e);
+ }
+ if (peer == null) {
+ return false;
}
- return addresses;
+ this.peerClusters.put(peerId, peer);
+ LOG.info("Added new peer cluster " + peer.getPeerConfig().getClusterKey());
+ return true;
}
private String getTableCFsNode(String id) {
@@ -485,18 +431,14 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
* @return object representing the peer
* @throws ReplicationException
*/
- private ReplicationPeer getPeer(String peerId) throws ReplicationException {
- Configuration peerConf = getPeerConf(peerId);
- if (peerConf == null) {
- return null;
- }
- if (this.ourClusterKey.equals(ZKUtil.getZooKeeperClusterKey(peerConf))) {
- LOG.debug("Not connecting to " + peerId + " because it's us");
+ private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
+ Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
+ if (pair == null) {
return null;
}
+ Configuration peerConf = pair.getSecond();
- ReplicationPeer peer =
- new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf));
+ ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
try {
peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
} catch (KeeperException e) {
@@ -511,7 +453,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
peerId, e);
}
- peer.getZkw().registerListener(new PeerRegionServerListener(peer));
return peer;
}
@@ -520,7 +461,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
* @return ClusterKey parsed from the passed bytes.
* @throws DeserializationException
*/
- private static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
+ private static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
+ throws DeserializationException {
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationPeer.Builder builder =
@@ -531,58 +473,70 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
- return peer.getClusterkey();
+ return convert(peer);
} else {
if (bytes.length > 0) {
- return Bytes.toString(bytes);
+ return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
}
- return "";
+ return new ReplicationPeerConfig().setClusterKey("");
}
}
- /**
- * @param clusterKey
- * @return Serialized protobuf of <code>clusterKey</code> with pb magic prefix prepended suitable
- * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
- * /hbase/replication/peers/PEER_ID
- */
- private static byte[] toByteArray(final String clusterKey) {
- byte[] bytes =
- ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
- .toByteArray();
- return ProtobufUtil.prependPBMagic(bytes);
- }
+ private static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) {
+ ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
+ if (peer.hasClusterkey()) {
+ peerConfig.setClusterKey(peer.getClusterkey());
+ }
+ if (peer.hasReplicationEndpointImpl()) {
+ peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
+ }
- /**
- * Tracks changes to the list of region servers in a peer's cluster.
- */
- public static class PeerRegionServerListener extends ZooKeeperListener {
+ for (BytesBytesPair pair : peer.getDataList()) {
+ peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
+ }
- private ReplicationPeer peer;
- private String regionServerListNode;
+ for (NameStringPair pair : peer.getConfigurationList()) {
+ peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
+ }
+ return peerConfig;
+ }
- public PeerRegionServerListener(ReplicationPeer replicationPeer) {
- super(replicationPeer.getZkw());
- this.peer = replicationPeer;
- this.regionServerListNode = peer.getZkw().rsZNode;
+ private static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
+ ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
+ if (peerConfig.getClusterKey() != null) {
+ builder.setClusterkey(peerConfig.getClusterKey());
+ }
+ if (peerConfig.getReplicationEndpointImpl() != null) {
+ builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
}
- public PeerRegionServerListener(String regionServerListNode, ZooKeeperWatcher zkw) {
- super(zkw);
- this.regionServerListNode = regionServerListNode;
+ for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
+ builder.addData(BytesBytesPair.newBuilder()
+ .setFirst(ByteString.copyFrom(entry.getKey()))
+ .setSecond(ByteString.copyFrom(entry.getValue()))
+ .build());
}
- @Override
- public synchronized void nodeChildrenChanged(String path) {
- if (path.equals(regionServerListNode)) {
- try {
- LOG.info("Detected change to peer regionservers, fetching updated list");
- peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
- } catch (KeeperException e) {
- LOG.fatal("Error reading slave addresses", e);
- }
- }
+ for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
+ builder.addConfiguration(NameStringPair.newBuilder()
+ .setName(entry.getKey())
+ .setValue(entry.getValue())
+ .build());
}
+ return builder.build();
}
+
+ /**
+ * @param peerConfig
+ * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
+ * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
+ * /hbase/replication/peers/PEER_ID
+ */
+ private static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
+ byte[] bytes = convert(peerConfig).toByteArray();
+ return ProtobufUtil.prependPBMagic(bytes);
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
index e0fb7cd..118d2bf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
@@ -93,6 +93,7 @@ public class ZKClusterId {
* @throws KeeperException
*/
public static UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
- return UUID.fromString(readClusterIdZNode(zkw));
+ String uuid = readClusterIdZNode(zkw);
+ return uuid == null ? null : UUID.fromString(uuid);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java
index e7ce8d5..4010dc0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java
@@ -27,5 +27,6 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Evolving
public class HBaseInterfaceAudience {
public static final String COPROC = "Coprocesssor";
+ public static final String REPLICATION = "Replication";
public static final String PHOENIX = "Phoenix";
}