You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2017/11/15 00:33:29 UTC
hbase git commit: HBASE-12091 Optionally ignore edits for dropped
tables for replication.
Repository: hbase
Updated Branches:
refs/heads/branch-1 c0639d271 -> f9833a780
HBASE-12091 Optionally ignore edits for dropped tables for replication.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f9833a78
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f9833a78
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f9833a78
Branch: refs/heads/branch-1
Commit: f9833a7802bd0d4291e8b6ec6baf2f85fe2e401f
Parents: c0639d2
Author: Lars Hofhansl <la...@apache.org>
Authored: Tue Nov 14 16:33:12 2017 -0800
Committer: Lars Hofhansl <la...@apache.org>
Committed: Tue Nov 14 16:33:12 2017 -0800
----------------------------------------------------------------------
.../RetriesExhaustedWithDetailsException.java | 9 +
.../hadoop/hbase/protobuf/ProtobufUtil.java | 18 ++
.../org/apache/hadoop/hbase/HConstants.java | 5 +
.../hbase/protobuf/ReplicationProtbufUtil.java | 2 +-
.../hbase/replication/ReplicationEndpoint.java | 6 +
.../HBaseInterClusterReplicationEndpoint.java | 66 ++++-
.../regionserver/ReplicationSink.java | 8 +
.../regionserver/ReplicationSourceManager.java | 2 +-
.../TestReplicationDroppedTables.java | 292 +++++++++++++++++++
9 files changed, 403 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
index 21ab156..e78f810 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.util.Bytes;
+import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collection;
@@ -51,6 +52,14 @@ extends RetriesExhaustedException {
List<Row> actions;
List<String> hostnameAndPort;
+ public RetriesExhaustedWithDetailsException(final String msg) {
+ super(msg);
+ }
+
+ public RetriesExhaustedWithDetailsException(final String msg, final IOException e) {
+ super(msg, e);
+ }
+
public RetriesExhaustedWithDetailsException(List<Throwable> exceptions,
List<Row> actions,
List<String> hostnameAndPort) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 5945e5e..84d4a67 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -331,6 +331,24 @@ public final class ProtobufUtil {
}
/**
+ * Return the Exception thrown by the remote server wrapped in
+ * ServiceException as cause. RemoteException are left untouched.
+ *
+ * @param se ServiceException that wraps IO exception thrown by the server
+ * @return Exception wrapped in ServiceException.
+ */
+ public static IOException getServiceException(ServiceException e) {
+ Throwable t = e;
+ if (e instanceof ServiceException) {
+ t = e.getCause();
+ }
+ if (ExceptionUtil.isInterrupt(t)) {
+ return ExceptionUtil.asInterrupt(t);
+ }
+ return t instanceof IOException ? (IOException) t : new HBaseIOException(t);
+ }
+
+ /**
* Like {@link #getRemoteException(ServiceException)} but more generic, able to handle more than
* just {@link ServiceException}. Prefer this method to
* {@link #getRemoteException(ServiceException)} because trying to
http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 8df7bd8..c9f9ded 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1247,6 +1247,11 @@ public final class HConstants {
public static final String REPLICATION_SOURCE_MAXTHREADS_KEY =
"hbase.replication.source.maxthreads";
+ /** Drop edits for tables that been deleted from the replication source and target */
+ public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY =
+ "hbase.replication.drop.on.deleted.table";
+
+ /** Maximum number of threads used by the replication source for shipping edits to the sinks */
public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10;
/** Config for pluggable consensus provider */
http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index d13a79c..6243511 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -71,7 +71,7 @@ public class ReplicationProtbufUtil {
try {
admin.replicateWALEntry(controller, p.getFirst());
} catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ throw ProtobufUtil.getServiceException(se);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index 69db31c..6f99377 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -51,6 +51,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
class Context {
+ private final Configuration localConf;
private final Configuration conf;
private final FileSystem fs;
private final TableDescriptors tableDescriptors;
@@ -62,6 +63,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
@InterfaceAudience.Private
public Context(
+ final Configuration localConf,
final Configuration conf,
final FileSystem fs,
final String peerId,
@@ -70,6 +72,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
final MetricsSource metrics,
final TableDescriptors tableDescriptors,
final Abortable abortable) {
+ this.localConf = localConf;
this.conf = conf;
this.fs = fs;
this.clusterId = clusterId;
@@ -82,6 +85,9 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
public Configuration getConfiguration() {
return conf;
}
+ public Configuration getLocalConfiguration() {
+ return localConf;
+ }
public FileSystem getFilesystem() {
return fs;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index cf85ffd..f1eb16d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -34,6 +34,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -43,10 +45,15 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
@@ -79,6 +86,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
private HConnection conn;
+ private Configuration localConf;
private Configuration conf;
// How long should we sleep for each retry
private long sleepForRetries;
@@ -102,11 +110,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private Path hfileArchiveDir;
private boolean replicationBulkLoadDataEnabled;
private Abortable abortable;
+ private boolean dropOnDeletedTables;
@Override
public void init(Context context) throws IOException {
super.init(context);
this.conf = HBaseConfiguration.create(ctx.getConfiguration());
+ this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
decorateConf();
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
@@ -139,6 +149,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
// conservative for now.
this.replicationRpcLimit = (int)(0.95 * (double)conf.getLong(RpcServer.MAX_REQUEST_SIZE,
RpcServer.DEFAULT_MAX_REQUEST_SIZE));
+ this.dropOnDeletedTables =
+ this.conf.getBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
this.replicationBulkLoadDataEnabled =
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
@@ -225,6 +237,37 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
return entryLists;
}
+ private TableName parseTable(String msg) {
+ // ... TableNotFoundException: '<table>'/n...
+ Pattern p = Pattern.compile("TableNotFoundException: \\'([\\S]*)\\'");
+ Matcher m = p.matcher(msg);
+ if (m.find()) {
+ String table = m.group(1);
+ try {
+ // double check that table is a valid table name
+ TableName.valueOf(TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(table)));
+ return TableName.valueOf(table);
+ } catch (IllegalArgumentException ignore) {
+ }
+ }
+ return null;
+ }
+
+ // Filter a set of batches by TableName
+ private List<List<Entry>> filterBatches(final List<List<Entry>> oldEntryList, TableName table) {
+ List<List<Entry>> entryLists = new ArrayList<>();
+ for (List<Entry> entries : oldEntryList) {
+ ArrayList<Entry> thisList = new ArrayList<Entry>(entries.size());
+ entryLists.add(thisList);
+ for (Entry e : entries) {
+ if (!e.getKey().getTablename().equals(table)) {
+ thisList.add(e);
+ }
+ }
+ }
+ return entryLists;
+ }
+
private void reconnectToPeerCluster() {
HConnection connection = null;
try {
@@ -325,10 +368,27 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
ioe = ((RemoteException) ioe).unwrapRemoteException();
LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
if (ioe instanceof TableNotFoundException) {
- if (sleepForRetries("A table is missing in the peer cluster. "
- + "Replication cannot proceed without losing data.", sleepMultiplier)) {
- sleepMultiplier++;
+ if (dropOnDeletedTables) {
+ // this is a bit fragile, but cannot change how TNFE is serialized
+ // at least check whether the table name is legal
+ TableName table = parseTable(ioe.getMessage());
+ if (table != null) {
+ try (Connection localConn =
+ ConnectionFactory.createConnection(ctx.getLocalConfiguration())) {
+ if (!localConn.getAdmin().tableExists(table)) {
+ // Would potentially be better to retry in one of the outer loops
+ // and add a table filter there; but that would break the encapsulation,
+ // so we're doing the filtering here.
+ LOG.info("Missing table detected at sink, local table also does not exist, filtering edits for '"+table+"'");
+ batches = filterBatches(batches, table);
+ continue;
+ }
+ } catch (IOException iox) {
+ LOG.warn("Exception checking for local table: ", iox);
+ }
+ }
}
+ // fall through and sleep below
} else {
LOG.warn("Peer encountered RemoteException, rechecking all sinks: ", ioe);
replicationSinkMgr.chooseSinks();
http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 8f262c5..769e347 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -42,12 +42,14 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
@@ -375,6 +377,12 @@ public class ReplicationSink {
for (List<Row> rows : allRows) {
table.batch(rows);
}
+ } catch (RetriesExhaustedWithDetailsException rewde) {
+ for (Throwable ex : rewde.getCauses()) {
+ if (ex instanceof TableNotFoundException) {
+ throw new TableNotFoundException("'"+tableName+"'");
+ }
+ }
} catch (InterruptedException ix) {
throw (InterruptedIOException) new InterruptedIOException().initCause(ix);
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index b50e840..77fd837 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -532,7 +532,7 @@ public class ReplicationSourceManager implements ReplicationListener {
clusterId, replicationEndpoint, metrics);
// init replication endpoint
- replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
+ replicationEndpoint.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(),
fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server));
return src;
http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
new file mode 100644
index 0000000..6c00047
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
@@ -0,0 +1,292 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Category(LargeTests.class)
+public class TestReplicationDroppedTables extends TestReplicationBase {
+ private static final Log LOG = LogFactory.getLog(TestReplicationDroppedTables.class);
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ // Starting and stopping replication can make us miss new logs,
+ // rolling like this makes sure the most recent one gets added to the queue
+ for ( JVMClusterUtil.RegionServerThread r :
+ utility1.getHBaseCluster().getRegionServerThreads()) {
+ utility1.getHBaseAdmin().rollWALWriter(r.getRegionServer().getServerName());
+ }
+ int rowCount = utility1.countRows(tableName);
+ utility1.deleteTableData(tableName);
+ // truncating the table will send one Delete per row to the slave cluster
+ // in an async fashion, which is why we cannot just call deleteTableData on
+ // utility2 since late writes could make it to the slave in some way.
+ // Instead, we truncate the first table and wait for all the Deletes to
+ // make it to the slave.
+ Scan scan = new Scan();
+ int lastCount = 0;
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for truncate");
+ }
+ ResultScanner scanner = htable2.getScanner(scan);
+ Result[] res = scanner.next(rowCount);
+ scanner.close();
+ if (res.length != 0) {
+ if (res.length < lastCount) {
+ i--; // Don't increment timeout if we make progress
+ }
+ lastCount = res.length;
+ LOG.info("Still got " + res.length + " rows");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+ }
+
+ @Test(timeout = 600000)
+ public void testEditsStuckBehindDroppedTable() throws Exception {
+ // Sanity check
+ // Make sure by default edits for dropped tables stall the replication queue, even when the
+ // table(s) in question have been deleted on both ends.
+ testEditsBehindDroppedTable(false, "test_dropped");
+ }
+
+ @Test(timeout = 600000)
+ public void testEditsDroppedWithDroppedTable() throws Exception {
+ // Make sure by default edits for dropped tables are themselves dropped when the
+ // table(s) in question have been deleted on both ends.
+ testEditsBehindDroppedTable(true, "test_dropped");
+ }
+
+ @Test(timeout = 600000)
+ public void testEditsDroppedWithDroppedTableNS() throws Exception {
+ // also try with a namespace
+ Connection connection1 = ConnectionFactory.createConnection(conf1);
+ try (Admin admin1 = connection1.getAdmin()) {
+ admin1.createNamespace(NamespaceDescriptor.create("NS").build());
+ }
+ Connection connection2 = ConnectionFactory.createConnection(conf2);
+ try (Admin admin2 = connection2.getAdmin()) {
+ admin2.createNamespace(NamespaceDescriptor.create("NS").build());
+ }
+ testEditsBehindDroppedTable(true, "NS:test_dropped");
+ }
+
+ private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception {
+ conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding);
+ conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
+
+ // make sure we have a single region server only, so that all
+ // edits for all tables go there
+ utility1.shutdownMiniHBaseCluster();
+ utility1.startMiniHBaseCluster(1, 1);
+
+ TableName tablename = TableName.valueOf(tName);
+ byte[] familyname = Bytes.toBytes("fam");
+ byte[] row = Bytes.toBytes("row");
+
+ HTableDescriptor table = new HTableDescriptor(tablename);
+ HColumnDescriptor fam = new HColumnDescriptor(familyname);
+ fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+ table.addFamily(fam);
+
+ Connection connection1 = ConnectionFactory.createConnection(conf1);
+ Connection connection2 = ConnectionFactory.createConnection(conf2);
+ try (Admin admin1 = connection1.getAdmin()) {
+ admin1.createTable(table);
+ }
+ try (Admin admin2 = connection2.getAdmin()) {
+ admin2.createTable(table);
+ }
+ utility1.waitUntilAllRegionsAssigned(tablename);
+ utility2.waitUntilAllRegionsAssigned(tablename);
+
+ Table lHtable1 = utility1.getConnection().getTable(tablename);
+
+ // now suspend replication
+ admin.disablePeer(PEER_ID);
+
+ // put some data (lead with 0 so the edit gets sorted before the other table's edits
+ // in the replication batch)
+ // write a bunch of edits, making sure we fill a batch
+ byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped");
+ Put put = new Put(rowkey);
+ put.addColumn(familyname, row, row);
+ lHtable1.put(put);
+
+ rowkey = Bytes.toBytes("normal put");
+ put = new Put(rowkey);
+ put.addColumn(famName, row, row);
+ htable1.put(put);
+
+ try (Admin admin1 = connection1.getAdmin()) {
+ admin1.disableTable(tablename);
+ admin1.deleteTable(tablename);
+ }
+ try (Admin admin2 = connection2.getAdmin()) {
+ admin2.disableTable(tablename);
+ admin2.deleteTable(tablename);
+ }
+
+ admin.enablePeer(PEER_ID);
+ if (allowProceeding) {
+ // in this we'd expect the key to make it over
+ verifyReplicationProceeded(rowkey);
+ } else {
+ verifyReplicationStuck(rowkey);
+ }
+ // just to be safe
+ conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
+ }
+
+ @Test(timeout = 600000)
+ public void testEditsBehindDroppedTableTiming() throws Exception {
+ conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);
+ conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
+
+ // make sure we have a single region server only, so that all
+ // edits for all tables go there
+ utility1.shutdownMiniHBaseCluster();
+ utility1.startMiniHBaseCluster(1, 1);
+
+ TableName tablename = TableName.valueOf("testdroppedtimed");
+ byte[] familyname = Bytes.toBytes("fam");
+ byte[] row = Bytes.toBytes("row");
+
+ HTableDescriptor table = new HTableDescriptor(tablename);
+ HColumnDescriptor fam = new HColumnDescriptor(familyname);
+ fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+ table.addFamily(fam);
+
+ Connection connection1 = ConnectionFactory.createConnection(conf1);
+ Connection connection2 = ConnectionFactory.createConnection(conf2);
+ try (Admin admin1 = connection1.getAdmin()) {
+ admin1.createTable(table);
+ }
+ try (Admin admin2 = connection2.getAdmin()) {
+ admin2.createTable(table);
+ }
+ utility1.waitUntilAllRegionsAssigned(tablename);
+ utility2.waitUntilAllRegionsAssigned(tablename);
+
+ Table lHtable1 = utility1.getConnection().getTable(tablename);
+
+ // now suspend replication
+ admin.disablePeer(PEER_ID);
+
+ // put some data (lead with 0 so the edit gets sorted before the other table's edits
+ // in the replication batch)
+ // write a bunch of edits, making sure we fill a batch
+ byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped");
+ Put put = new Put(rowkey);
+ put.addColumn(familyname, row, row);
+ lHtable1.put(put);
+
+ rowkey = Bytes.toBytes("normal put");
+ put = new Put(rowkey);
+ put.addColumn(famName, row, row);
+ htable1.put(put);
+
+ try (Admin admin2 = connection2.getAdmin()) {
+ admin2.disableTable(tablename);
+ admin2.deleteTable(tablename);
+ }
+
+ admin.enablePeer(PEER_ID);
+ // edit should still be stuck
+
+ try (Admin admin1 = connection1.getAdmin()) {
+ // the source table still exists, replication should be stalled
+ verifyReplicationStuck(rowkey);
+
+ admin1.disableTable(tablename);
+ // still stuck, source table still exists
+ verifyReplicationStuck(rowkey);
+
+ admin1.deleteTable(tablename);
+ // now the source table is gone, replication should proceed, the
+ // offending edits be dropped
+ verifyReplicationProceeded(rowkey);
+ }
+ // just to be safe
+ conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
+ }
+
+ private void verifyReplicationProceeded(byte[] rowkey) throws Exception {
+ Get get = new Get(rowkey);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for put replication");
+ }
+ Result res = htable2.get(get);
+ if (res.size() == 0) {
+ LOG.info("Row not available");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ assertArrayEquals(res.getRow(), rowkey);
+ break;
+ }
+ }
+ }
+
+ private void verifyReplicationStuck(byte[] rowkey) throws Exception {
+ Get get = new Get(rowkey);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ Result res = htable2.get(get);
+ if (res.size() >= 1) {
+ fail("Edit should have been stuck behind dropped tables");
+ } else {
+ LOG.info("Row not replicated, let's wait a bit more...");
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+ }
+}