You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2013/08/29 00:58:55 UTC
svn commit: r1518410 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/regionserver/wal/
main/java/org/apache/hadoop/hbase/replication/regionserver...
Author: larsh
Date: Wed Aug 28 22:58:55 2013
New Revision: 1518410
URL: http://svn.apache.org/r1518410
Log:
HBASE-7709 Infinite loop possible in Master/Master replication (Vasu Mariyala)
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java?rev=1518410&r1=1518409&r2=1518410&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java Wed Aug 28 22:58:55 2013
@@ -33,12 +33,20 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+
public abstract class Mutation extends OperationWithAttributes implements Row {
private static final Log LOG = LogFactory.getLog(Mutation.class);
// Attribute used in Mutations to indicate the originating cluster.
private static final String CLUSTER_ID_ATTR = "_c.id_";
private static final String DURABILITY_ID_ATTR = "_dur_";
+ /**
+ * The attribute for storing the list of clusters that have consumed the change.
+ */
+ private static final String CONSUMED_CLUSTER_IDS = "_cs.id";
protected byte [] row = null;
protected long ts = HConstants.LATEST_TIMESTAMP;
protected long lockId = -1L;
@@ -243,6 +251,36 @@ public abstract class Mutation extends O
}
/**
+ * Marks that the clusters with the given clusterIds have consumed the mutation
+ * @param clusterIds of the clusters that have consumed the mutation
+ */
+ public void setClusterIds(List<UUID> clusterIds) {
+ ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ out.writeInt(clusterIds.size());
+ for (UUID clusterId : clusterIds) {
+ out.writeLong(clusterId.getMostSignificantBits());
+ out.writeLong(clusterId.getLeastSignificantBits());
+ }
+ setAttribute(CONSUMED_CLUSTER_IDS, out.toByteArray());
+ }
+
+ /**
+ * @return the set of cluster Ids that have consumed the mutation
+ */
+ public List<UUID> getClusterIds() {
+ List<UUID> clusterIds = new ArrayList<UUID>();
+ byte[] bytes = getAttribute(CONSUMED_CLUSTER_IDS);
+ if(bytes != null) {
+ ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
+ int numClusters = in.readInt();
+ for(int i=0; i<numClusters; i++){
+ clusterIds.add(new UUID(in.readLong(), in.readLong()));
+ }
+ }
+ return clusterIds;
+ }
+
+ /**
* @return the total number of KeyValues
*/
public int size() {
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1518410&r1=1518409&r2=1518410&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Aug 28 22:58:55 2013
@@ -1995,6 +1995,7 @@ public class HRegion implements HeapSize
// bunch up all edits across all column families into a
// single WALEdit.
addFamilyMapToWALEdit(familyMap, walEdit);
+ walEdit.addClusterIds(delete.getClusterIds());
this.log.append(regionInfo, this.htableDescriptor.getName(),
walEdit, clusterId, now, this.htableDescriptor);
}
@@ -2448,6 +2449,7 @@ public class HRegion implements HeapSize
// STEP 5. Append the edit to WAL. Do not sync wal.
// -------------------------
Mutation first = batchOp.operations[firstIndex].getFirst();
+ walEdit.addClusterIds(first.getClusterIds());
txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
walEdit, first.getClusterId(), now, this.htableDescriptor);
@@ -2904,6 +2906,7 @@ public class HRegion implements HeapSize
// will contain uncommitted transactions.
if (writeToWAL) {
addFamilyMapToWALEdit(familyMap, walEdit);
+ walEdit.addClusterIds(put.getClusterIds());
this.log.append(regionInfo, this.htableDescriptor.getName(),
walEdit, clusterId, now, this.htableDescriptor);
} else {
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java?rev=1518410&r1=1518409&r2=1518410&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java Wed Aug 28 22:58:55 2013
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
+import java.util.UUID;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Decoder;
@@ -73,10 +74,29 @@ import org.apache.hadoop.io.Writable;
*/
public class WALEdit implements Writable, HeapSize {
+ /*
+ * The cluster id of the cluster which has consumed the change represented by this class is
+ * prefixed with the value of this variable while storing in the scopes variable. This is to
+ * ensure that the cluster ids don't interfere with the column family replication settings stored
+ * in the scopes. The value is chosen to start with period as the column families can't start with
+ * it.
+ */
+ private static final String PREFIX_CLUSTER_KEY = ".";
private final int VERSION_2 = -1;
private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
- private NavigableMap<byte[], Integer> scopes;
+
+ /**
+ * This variable contains the information of the column family replication settings and contains
+ * the clusters that have already consumed the change represented by the object. This overloading
+ * of scopes with the consumed clusterids was introduced while porting the fix for HBASE-7709 back
+ * to 0.94 release. However, this overloading has been removed in the newer releases(0.95.2+). To
+ * check/change the column family settings, please use the getFromScope and putIntoScope methods
+ * and for marking/checking if a cluster has consumed the change, please use addCluster,
+ * addClusters and getClusters methods.
+ */
+ private final NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
// default to decoding uncompressed data - needed for replication, which enforces that
// uncompressed edits are sent across the wire. In the regular case (reading/writing WAL), the
@@ -116,22 +136,58 @@ public class WALEdit implements Writable
return kvs;
}
- public NavigableMap<byte[], Integer> getScopes() {
- return scopes;
+ public Integer getFromScope(byte[] key) {
+ return scopes.get(key);
}
+ public void putIntoScope(byte[] key, Integer value) {
+ scopes.put(key, value);
+ }
- public void setScopes (NavigableMap<byte[], Integer> scopes) {
- // We currently process the map outside of WALEdit,
- // TODO revisit when replication is part of core
- this.scopes = scopes;
+ public boolean hasKeyInScope(byte[] key) {
+ return scopes.containsKey(key);
+ }
+
+ /**
+ * @return true if the cluster with the given clusterId has consumed the change.
+ */
+ public boolean hasClusterId(UUID clusterId) {
+ return hasKeyInScope(Bytes.toBytes(PREFIX_CLUSTER_KEY + clusterId.toString()));
+ }
+
+ /**
+ * Marks that the cluster with the given clusterId has consumed the change.
+ */
+ public void addClusterId(UUID clusterId) {
+ scopes.put(Bytes.toBytes(PREFIX_CLUSTER_KEY + clusterId.toString()), 1);
+ }
+
+ /**
+ * Marks that the clusters with the given clusterIds have consumed the change.
+ */
+ public void addClusterIds(List<UUID> clusterIds) {
+ for (UUID clusterId : clusterIds) {
+ addClusterId(clusterId);
+ }
+ }
+
+ /**
+ * @return the set of cluster Ids that have consumed the change.
+ */
+ public List<UUID> getClusterIds() {
+ List<UUID> clusterIds = new ArrayList<UUID>();
+ for (byte[] keyBytes : scopes.keySet()) {
+ String key = Bytes.toString(keyBytes);
+ if (key.startsWith(PREFIX_CLUSTER_KEY)) {
+ clusterIds.add(UUID.fromString(key.substring(PREFIX_CLUSTER_KEY.length())));
+ }
+ }
+ return clusterIds;
}
public void readFields(DataInput in) throws IOException {
kvs.clear();
- if (scopes != null) {
- scopes.clear();
- }
+ scopes.clear();
Decoder decoder = this.codec.getDecoder((DataInputStream) in);
int versionOrLength = in.readInt();
int length = versionOrLength;
@@ -148,15 +204,12 @@ public class WALEdit implements Writable
//its a new style WAL, so we need replication scopes too
if (versionOrLength == VERSION_2) {
- int numFamilies = in.readInt();
- if (numFamilies > 0) {
- if (scopes == null) {
- scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
- }
- for (int i = 0; i < numFamilies; i++) {
- byte[] fam = Bytes.readByteArray(in);
+ int numEntries = in.readInt();
+ if (numEntries > 0) {
+ for (int i = 0; i < numEntries; i++) {
+ byte[] key = Bytes.readByteArray(in);
int scope = in.readInt();
- scopes.put(fam, scope);
+ scopes.put(key, scope);
}
}
}
@@ -173,14 +226,10 @@ public class WALEdit implements Writable
}
kvEncoder.flush();
- if (scopes == null) {
- out.writeInt(0);
- } else {
- out.writeInt(scopes.size());
- for (byte[] key : scopes.keySet()) {
- Bytes.writeByteArray(out, key);
- out.writeInt(scopes.get(key));
- }
+ out.writeInt(scopes.size());
+ for (byte[] key : scopes.keySet()) {
+ Bytes.writeByteArray(out, key);
+ out.writeInt(scopes.get(key));
}
}
@@ -189,11 +238,9 @@ public class WALEdit implements Writable
for (KeyValue kv : kvs) {
ret += kv.heapSize();
}
- if (scopes != null) {
- ret += ClassSize.TREEMAP;
- ret += ClassSize.align(scopes.size() * ClassSize.MAP_ENTRY);
- // TODO this isn't quite right, need help here
- }
+ ret += ClassSize.TREEMAP;
+ ret += ClassSize.align(scopes.size() * ClassSize.MAP_ENTRY);
+ // TODO this isn't quite right, need help here
return ret;
}
@@ -205,9 +252,7 @@ public class WALEdit implements Writable
sb.append(kv.toString());
sb.append("; ");
}
- if (scopes != null) {
- sb.append(" scopes: " + scopes.toString());
- }
+ sb.append(" scopes: " + scopes.toString());
sb.append(">]");
return sb.toString();
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1518410&r1=1518409&r2=1518410&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Wed Aug 28 22:58:55 2013
@@ -172,20 +172,15 @@ public class Replication implements WALA
@Override
public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
WALEdit logEdit) {
- NavigableMap<byte[], Integer> scopes =
- new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
byte[] family;
for (KeyValue kv : logEdit.getKeyValues()) {
family = kv.getFamily();
int scope = htd.getFamily(family).getScope();
if (scope != REPLICATION_SCOPE_LOCAL &&
- !scopes.containsKey(family)) {
- scopes.put(family, scope);
+ !logEdit.hasKeyInScope(family)) {
+ logEdit.putIntoScope(family, scope);
}
}
- if (!scopes.isEmpty()) {
- logEdit.setScopes(scopes);
- }
}
@Override
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1518410&r1=1518409&r2=1518410&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Wed Aug 28 22:58:55 2013
@@ -110,9 +110,11 @@ public class ReplicationSink {
// to the same table.
try {
long totalReplicated = 0;
- // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
- // invocation of this method per table and cluster id.
- Map<byte[], Map<UUID,List<Row>>> rowMap = new TreeMap<byte[], Map<UUID,List<Row>>>(Bytes.BYTES_COMPARATOR);
+ // Map of table => list of Rows, grouped by clusters that consumed the change, we only want to
+ // flushCommits once per
+ // invocation of this method per table and clusters that have consumed the change.
+ Map<byte[], Map<List<UUID>, List<Row>>> rowMap =
+ new TreeMap<byte[], Map<List<UUID>, List<Row>>>(Bytes.BYTES_COMPARATOR);
for (HLog.Entry entry : entries) {
WALEdit edit = entry.getEdit();
byte[] table = entry.getKey().getTablename();
@@ -123,14 +125,19 @@ public class ReplicationSink {
for (KeyValue kv : kvs) {
if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
UUID clusterId = entry.getKey().getClusterId();
+ List<UUID> clusterIds = edit.getClusterIds();
if (kv.isDelete()) {
del = new Delete(kv.getRow());
del.setClusterId(clusterId);
- addToHashMultiMap(rowMap, table, clusterId, del);
+ del.setClusterIds(clusterIds);
+ clusterIds.add(clusterId);
+ addToHashMultiMap(rowMap, table, clusterIds, del);
} else {
put = new Put(kv.getRow());
put.setClusterId(clusterId);
- addToHashMultiMap(rowMap, table, clusterId, put);
+ put.setClusterIds(clusterIds);
+ clusterIds.add(clusterId);
+ addToHashMultiMap(rowMap, table, clusterIds, put);
}
}
if (kv.isDelete()) {
@@ -142,7 +149,7 @@ public class ReplicationSink {
}
totalReplicated++;
}
- for(Map.Entry<byte[], Map<UUID, List<Row>>> entry : rowMap.entrySet()) {
+ for(Map.Entry<byte[], Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
batch(entry.getKey(), entry.getValue().values());
}
this.metrics.setAgeOfLastAppliedOp(
@@ -162,7 +169,7 @@ public class ReplicationSink {
* @param key1
* @param key2
* @param value
- * @return
+ * @return the list of values for the combination of key1 and key2
*/
private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
Map<K2,List<V>> innerMap = map.get(key1);
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1518410&r1=1518409&r2=1518410&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Wed Aug 28 22:58:55 2013
@@ -30,7 +30,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
-import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
@@ -59,7 +58,6 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.ClusterId;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
@@ -477,8 +475,14 @@ public class ReplicationSource extends T
seenEntries++;
// Remove all KVs that should not be replicated
HLogKey logKey = entry.getKey();
- // don't replicate if the log entries originated in the peer
- if (!logKey.getClusterId().equals(peerClusterId)) {
+ List<UUID> consumedClusterIds = edit.getClusterIds();
+ // This cluster id has been added to resolve the scenario of A -> B -> A where A has old
+ // point release and B has the new point release which has the fix HBASE-7709. A change on
+ // cluster A would infinitely replicate to
+ // cluster B if we don't add the original cluster id to the set.
+ consumedClusterIds.add(logKey.getClusterId());
+ // don't replicate if the log entries if it has not already been replicated
+ if (!consumedClusterIds.contains(peerClusterId)) {
removeNonReplicableEdits(edit);
// Don't replicate catalog entries, if the WALEdit wasn't
// containing anything to replicate and if we're currently not set to replicate
@@ -491,6 +495,8 @@ public class ReplicationSource extends T
// This is *only* place where a cluster id other than the default is set.
if (HConstants.DEFAULT_CLUSTER_ID == logKey.getClusterId()) {
logKey.setClusterId(this.clusterId);
+ } else if (logKey.getClusterId() != this.clusterId) {
+ edit.addClusterId(clusterId);
}
currentNbOperations += countDistinctRowKeys(edit);
currentNbEntries++;
@@ -664,13 +670,12 @@ public class ReplicationSource extends T
* @param edit The KV to check for replication
*/
protected void removeNonReplicableEdits(WALEdit edit) {
- NavigableMap<byte[], Integer> scopes = edit.getScopes();
List<KeyValue> kvs = edit.getKeyValues();
for (int i = edit.size()-1; i >= 0; i--) {
KeyValue kv = kvs.get(i);
// The scope will be null or empty if
// there's nothing to replicate in that WALEdit
- if (scopes == null || !scopes.containsKey(kv.getFamily())) {
+ if (!edit.hasKeyInScope(kv.getFamily())) {
kvs.remove(i);
}
}
@@ -928,4 +933,4 @@ public class ReplicationSource extends T
return Long.parseLong(parts[parts.length-1]);
}
}
-}
+}
\ No newline at end of file
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java?rev=1518410&r1=1518409&r2=1518410&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java Wed Aug 28 22:58:55 2013
@@ -23,8 +23,10 @@ import static org.junit.Assert.assertArr
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -55,15 +57,11 @@ public class TestMasterReplication {
private static final Log LOG = LogFactory.getLog(TestReplicationBase.class);
- private Configuration conf1;
- private Configuration conf2;
- private Configuration conf3;
-
- private HBaseTestingUtility utility1;
- private HBaseTestingUtility utility2;
- private HBaseTestingUtility utility3;
-
- private MiniZooKeeperCluster miniZK;
+ private Configuration baseConfiguration;
+
+ private HBaseTestingUtility[] utilities;
+ private Configuration[] configurations;
+ private MiniZooKeeperCluster miniZK;
private static final long SLEEP_TIME = 500;
private static final int NB_RETRIES = 10;
@@ -85,44 +83,21 @@ public class TestMasterReplication {
@Before
public void setUp() throws Exception {
- conf1 = HBaseConfiguration.create();
- conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+ baseConfiguration = HBaseConfiguration.create();
// smaller block size and capacity to trigger more operations
// and test them
- conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
- conf1.setInt("replication.source.size.capacity", 1024);
- conf1.setLong("replication.source.sleepforretries", 100);
- conf1.setInt("hbase.regionserver.maxlogs", 10);
- conf1.setLong("hbase.master.logcleaner.ttl", 10);
- conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
- conf1.setBoolean("dfs.support.append", true);
- conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
- conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+ baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
+ baseConfiguration.setInt("replication.source.size.capacity", 1024);
+ baseConfiguration.setLong("replication.source.sleepforretries", 100);
+ baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
+ baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
+ baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+ baseConfiguration.setBoolean("dfs.support.append", true);
+ baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+ baseConfiguration.setStrings(
+ CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
CoprocessorCounter.class.getName());
- utility1 = new HBaseTestingUtility(conf1);
- utility1.startMiniZKCluster();
- miniZK = utility1.getZkCluster();
- // By setting the mini ZK cluster through this method, even though this is
- // already utility1's mini ZK cluster, we are telling utility1 not to shut
- // the mini ZK cluster when we shut down the HBase cluster.
- utility1.setZkCluster(miniZK);
- new ZooKeeperWatcher(conf1, "cluster1", null, true);
-
- conf2 = new Configuration(conf1);
- conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
-
- utility2 = new HBaseTestingUtility(conf2);
- utility2.setZkCluster(miniZK);
- new ZooKeeperWatcher(conf2, "cluster2", null, true);
-
- conf3 = new Configuration(conf1);
- conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
-
- utility3 = new HBaseTestingUtility(conf3);
- utility3.setZkCluster(miniZK);
- new ZooKeeperWatcher(conf3, "cluster3", null, true);
-
table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
@@ -131,181 +106,300 @@ public class TestMasterReplication {
table.addFamily(fam);
}
- @After
- public void tearDown() throws IOException {
- miniZK.shutdown();
+ /**
+ * It tests the replication scenario involving 0 -> 1 -> 0. It does it by
+ * adding and deleting a row to a table in each cluster, checking if it's
+ * replicated. It also tests that the puts and deletes are not replicated back
+ * to the originating cluster.
+ */
+ @Test(timeout = 300000)
+ public void testCyclicReplication1() throws Exception {
+ LOG.info("testSimplePutDelete");
+ int numClusters = 2;
+ HTable[] htables = null;
+ try {
+ startMiniClusters(numClusters);
+ createTableOnClusters(table);
+
+ htables = getHTablesOnClusters(tableName);
+
+ // Test the replication scenarios of 0 -> 1 -> 0
+ addPeer("1", 0, 1);
+ addPeer("1", 1, 0);
+
+ int[] expectedCounts = new int[] { 2, 2 };
+
+ // add rows to both clusters,
+ // make sure they are both replication
+ putAndWait(row, famName, htables[0], htables[1]);
+ putAndWait(row1, famName, htables[1], htables[0]);
+ validateCounts(htables, put, expectedCounts);
+
+ deleteAndWait(row, htables[0], htables[1]);
+ deleteAndWait(row1, htables[1], htables[0]);
+ validateCounts(htables, delete, expectedCounts);
+ } finally {
+ close(htables);
+ shutDownMiniClusters();
+ }
}
- @Test(timeout=300000)
- public void testCyclicReplication() throws Exception {
- LOG.info("testCyclicReplication");
- utility1.startMiniCluster();
- utility2.startMiniCluster();
- utility3.startMiniCluster();
- ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
- ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
- ReplicationAdmin admin3 = new ReplicationAdmin(conf3);
-
- new HBaseAdmin(conf1).createTable(table);
- new HBaseAdmin(conf2).createTable(table);
- new HBaseAdmin(conf3).createTable(table);
- HTable htable1 = new HTable(conf1, tableName);
- htable1.setWriteBufferSize(1024);
- HTable htable2 = new HTable(conf2, tableName);
- htable2.setWriteBufferSize(1024);
- HTable htable3 = new HTable(conf3, tableName);
- htable3.setWriteBufferSize(1024);
-
- admin1.addPeer("1", utility2.getClusterKey());
- admin2.addPeer("1", utility3.getClusterKey());
- admin3.addPeer("1", utility1.getClusterKey());
-
- // put "row" and wait 'til it got around
- putAndWait(row, famName, htable1, htable3);
- // it should have passed through table2
- check(row,famName,htable2);
-
- putAndWait(row1, famName, htable2, htable1);
- check(row,famName,htable3);
- putAndWait(row2, famName, htable3, htable2);
- check(row,famName,htable1);
-
- deleteAndWait(row,htable1,htable3);
- deleteAndWait(row1,htable2,htable1);
- deleteAndWait(row2,htable3,htable2);
-
- assertEquals("Puts were replicated back ", 3, getCount(htable1, put));
- assertEquals("Puts were replicated back ", 3, getCount(htable2, put));
- assertEquals("Puts were replicated back ", 3, getCount(htable3, put));
- assertEquals("Deletes were replicated back ", 3, getCount(htable1, delete));
- assertEquals("Deletes were replicated back ", 3, getCount(htable2, delete));
- assertEquals("Deletes were replicated back ", 3, getCount(htable3, delete));
-
- // Test HBASE-9158
- admin2.disablePeer("1");
- // we now have an edit that was replicated into cluster originating from cluster 1
- putAndWait(row3, famName, htable1, htable2);
- // now add a local edit to cluster 2
- Put put = new Put(row4);
- put.add(famName, row4, row4);
- htable2.put(put);
- // reenable replication from cluster 2 to cluster 3
- admin2.enablePeer("1");
- // without HBASE-9158 the edit for row4 would have been marked with cluster 1's id
- // and hence not replicated to cluster 1
- wait(row4, htable1);
-
- utility3.shutdownMiniCluster();
- utility2.shutdownMiniCluster();
- utility1.shutdownMiniCluster();
+ /**
+ * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and
+ * deleting rows to a table in each clusters and ensuring that the each of
+ * these clusters get the appropriate mutations. It also tests the grouping
+ * scenario where a cluster needs to replicate the edits originating from
+ * itself and also the edits that it received using replication from a
+ * different cluster. The scenario is explained in HBASE-9158
+ */
+ @Test(timeout = 300000)
+ public void testCyclicReplication2() throws Exception {
+ LOG.info("testCyclicReplication1");
+ int numClusters = 3;
+ HTable[] htables = null;
+ try {
+ startMiniClusters(numClusters);
+ createTableOnClusters(table);
+
+ // Test the replication scenario of 0 -> 1 -> 2 -> 0
+ addPeer("1", 0, 1);
+ addPeer("1", 1, 2);
+ addPeer("1", 2, 0);
+
+ htables = getHTablesOnClusters(tableName);
+
+ // put "row" and wait 'til it got around
+ putAndWait(row, famName, htables[0], htables[2]);
+ putAndWait(row1, famName, htables[1], htables[0]);
+ putAndWait(row2, famName, htables[2], htables[1]);
+
+ deleteAndWait(row, htables[0], htables[2]);
+ deleteAndWait(row1, htables[1], htables[0]);
+ deleteAndWait(row2, htables[2], htables[1]);
+
+ int[] expectedCounts = new int[] { 3, 3, 3 };
+ validateCounts(htables, put, expectedCounts);
+ validateCounts(htables, delete, expectedCounts);
+
+ // Test HBASE-9158
+ disablePeer("1", 2);
+ // we now have an edit that was replicated into cluster originating from
+ // cluster 0
+ putAndWait(row3, famName, htables[0], htables[1]);
+ // now add a local edit to cluster 1
+ htables[1].put(new Put(row4).add(famName, row4, row4));
+ // re-enable replication from cluster 2 to cluster 0
+ enablePeer("1", 2);
+ // without HBASE-9158 the edit for row4 would have been marked with
+ // cluster 0's id
+ // and hence not replicated to cluster 0
+ wait(row4, htables[0], true);
+ } finally {
+ close(htables);
+ shutDownMiniClusters();
+ }
}
/**
- * Add a row to a table in each cluster, check it's replicated,
- * delete it, check's gone
- * Also check the puts and deletes are not replicated back to
- * the originating cluster.
+ * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1.
*/
- @Test(timeout=300000)
- public void testSimplePutDelete() throws Exception {
- LOG.info("testSimplePutDelete");
- utility1.startMiniCluster();
- utility2.startMiniCluster();
+ @Test(timeout = 300000)
+ public void testCyclicReplication3() throws Exception {
+ LOG.info("testCyclicReplication2");
+ int numClusters = 3;
+ HTable[] htables = null;
+ try {
+ startMiniClusters(numClusters);
+ createTableOnClusters(table);
+
+ // Test the replication scenario of 0 -> 1 -> 2 -> 1
+ addPeer("1", 0, 1);
+ addPeer("1", 1, 2);
+ addPeer("1", 2, 1);
+
+ htables = getHTablesOnClusters(tableName);
+
+ // put "row" and wait 'til it got around
+ putAndWait(row, famName, htables[0], htables[2]);
+ putAndWait(row1, famName, htables[1], htables[2]);
+ putAndWait(row2, famName, htables[2], htables[1]);
+
+ deleteAndWait(row, htables[0], htables[2]);
+ deleteAndWait(row1, htables[1], htables[2]);
+ deleteAndWait(row2, htables[2], htables[1]);
+
+ int[] expectedCounts = new int[] { 1, 3, 3 };
+ validateCounts(htables, put, expectedCounts);
+ validateCounts(htables, delete, expectedCounts);
+ } finally {
+ close(htables);
+ shutDownMiniClusters();
+ }
+ }
- ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
- ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
+ @After
+ public void tearDown() throws IOException {
+ configurations = null;
+ utilities = null;
+ }
+
+ @SuppressWarnings("resource")
+ private void startMiniClusters(int numClusters) throws Exception {
+ Random random = new Random();
+ utilities = new HBaseTestingUtility[numClusters];
+ configurations = new Configuration[numClusters];
+ for (int i = 0; i < numClusters; i++) {
+ Configuration conf = new Configuration(baseConfiguration);
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt());
+ HBaseTestingUtility utility = new HBaseTestingUtility(conf);
+ if (i == 0) {
+ utility.startMiniZKCluster();
+ miniZK = utility.getZkCluster();
+ } else {
+ utility.setZkCluster(miniZK);
+ }
+ utility.startMiniCluster();
+ utilities[i] = utility;
+ configurations[i] = conf;
+ new ZooKeeperWatcher(conf, "cluster" + i, null, true);
+ }
+ }
- new HBaseAdmin(conf1).createTable(table);
- new HBaseAdmin(conf2).createTable(table);
- HTable htable1 = new HTable(conf1, tableName);
- htable1.setWriteBufferSize(1024);
- HTable htable2 = new HTable(conf2, tableName);
- htable2.setWriteBufferSize(1024);
+ private void shutDownMiniClusters() throws Exception {
+ int numClusters = utilities.length;
+ for (int i = numClusters - 1; i >= 0; i--) {
+ if (utilities[i] != null) {
+ utilities[i].shutdownMiniCluster();
+ }
+ }
+ miniZK.shutdown();
+ }
- // set M-M
- admin1.addPeer("1", utility2.getClusterKey());
- admin2.addPeer("1", utility1.getClusterKey());
+ private void createTableOnClusters(HTableDescriptor table) throws Exception {
+ int numClusters = configurations.length;
+ for (int i = 0; i < numClusters; i++) {
+ HBaseAdmin hbaseAdmin = null;
+ try {
+ hbaseAdmin = new HBaseAdmin(configurations[i]);
+ hbaseAdmin.createTable(table);
+ } finally {
+ close(hbaseAdmin);
+ }
+ }
+ }
- // add rows to both clusters,
- // make sure they are both replication
- putAndWait(row, famName, htable1, htable2);
- putAndWait(row1, famName, htable2, htable1);
+ private void addPeer(String id, int masterClusterNumber,
+ int slaveClusterNumber) throws Exception {
+ ReplicationAdmin replicationAdmin = null;
+ try {
+ replicationAdmin = new ReplicationAdmin(
+ configurations[masterClusterNumber]);
+ replicationAdmin.addPeer(id,
+ utilities[slaveClusterNumber].getClusterKey());
+ } finally {
+ close(replicationAdmin);
+ }
+ }
- // make sure "row" did not get replicated back.
- assertEquals("Puts were replicated back ", 2, getCount(htable1, put));
+ private void disablePeer(String id, int masterClusterNumber) throws Exception {
+ ReplicationAdmin replicationAdmin = null;
+ try {
+ replicationAdmin = new ReplicationAdmin(
+ configurations[masterClusterNumber]);
+ replicationAdmin.disablePeer(id);
+ } finally {
+ close(replicationAdmin);
+ }
+ }
- // delete "row" and wait
- deleteAndWait(row, htable1, htable2);
+ private void enablePeer(String id, int masterClusterNumber) throws Exception {
+ ReplicationAdmin replicationAdmin = null;
+ try {
+ replicationAdmin = new ReplicationAdmin(
+ configurations[masterClusterNumber]);
+ replicationAdmin.enablePeer(id);
+ } finally {
+ close(replicationAdmin);
+ }
+ }
- // make the 2nd cluster replicated back
- assertEquals("Puts were replicated back ", 2, getCount(htable2, put));
+ private void close(Closeable... closeables) {
+ try {
+ if (closeables != null) {
+ for (Closeable closeable : closeables) {
+ closeable.close();
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception occured while closing the object:", e);
+ }
+ }
- deleteAndWait(row1, htable2, htable1);
+ @SuppressWarnings("resource")
+ private HTable[] getHTablesOnClusters(byte[] tableName) throws Exception {
+ int numClusters = utilities.length;
+ HTable[] htables = new HTable[numClusters];
+ for (int i = 0; i < numClusters; i++) {
+ HTable htable = new HTable(configurations[i], tableName);
+ htable.setWriteBufferSize(1024);
+ htables[i] = htable;
+ }
+ return htables;
+ }
- assertEquals("Deletes were replicated back ", 2, getCount(htable1, delete));
- utility2.shutdownMiniCluster();
- utility1.shutdownMiniCluster();
+ private void validateCounts(HTable[] htables, byte[] type,
+ int[] expectedCounts) throws IOException {
+ for (int i = 0; i < htables.length; i++) {
+ assertEquals(Bytes.toString(type) + " were replicated back ",
+ expectedCounts[i], getCount(htables[i], type));
+ }
}
- private int getCount(HTable t, byte[] type) throws IOException {
+ private int getCount(HTable t, byte[] type) throws IOException {
Get test = new Get(row);
- test.setAttribute("count", new byte[]{});
+ test.setAttribute("count", new byte[] {});
Result res = t.get(test);
return Bytes.toInt(res.getValue(count, type));
}
private void deleteAndWait(byte[] row, HTable source, HTable target)
- throws Exception {
+ throws Exception {
Delete del = new Delete(row);
source.delete(del);
-
- Get get = new Get(row);
- for (int i = 0; i < NB_RETRIES; i++) {
- if (i==NB_RETRIES-1) {
- fail("Waited too much time for del replication");
- }
- Result res = target.get(get);
- if (res.size() >= 1) {
- LOG.info("Row not deleted");
- Thread.sleep(SLEEP_TIME);
- } else {
- break;
- }
- }
- }
-
- private void check(byte[] row, byte[] fam, HTable t) throws IOException {
- Get get = new Get(row);
- Result res = t.get(get);
- if (res.size() == 0) {
- fail("Row is missing");
- }
+ wait(row, target, true);
}
private void putAndWait(byte[] row, byte[] fam, HTable source, HTable target)
- throws Exception {
+ throws Exception {
Put put = new Put(row);
put.add(fam, row, row);
source.put(put);
-
- wait(row, target);
+ wait(row, target, false);
}
- private void wait(byte[] row, HTable target) throws Exception {
+ private void wait(byte[] row, HTable target, boolean isDeleted)
+ throws Exception {
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
- if (i==NB_RETRIES-1) {
- fail("Waited too much time for put replication");
+ if (i == NB_RETRIES - 1) {
+ fail("Waited too much time for replication. Row:" + Bytes.toString(row)
+ + ". IsDeleteReplication:" + isDeleted);
}
Result res = target.get(get);
- if (res.size() == 0) {
- LOG.info("Row not available");
+ boolean sleep = isDeleted ? res.size() > 0 : res.size() == 0;
+ if (sleep) {
+ LOG.info("Waiting for more time for replication. Row:"
+ + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
Thread.sleep(SLEEP_TIME);
} else {
- assertArrayEquals(res.value(), row);
+ if (!isDeleted) {
+ assertArrayEquals(res.value(), row);
+ }
+ LOG.info("Obtained row:"
+ + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
break;
}
- }
+ }
}
/**