You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2018/07/23 21:49:14 UTC

[2/4] hbase git commit: HBASE-20908 Infinite loop on regionserver if region replica are reduced

HBASE-20908 Infinite loop on regionserver if region replica are reduced

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3d378b2a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3d378b2a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3d378b2a

Branch: refs/heads/branch-1.4
Commit: 3d378b2ac0659e2f64469a6b96d2aace681b0ca0
Parents: f7e8dcd
Author: Ankit Singhal <an...@gmail.com>
Authored: Fri Jul 20 14:13:18 2018 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Jul 23 13:11:48 2018 -0700

----------------------------------------------------------------------
 .../RegionReplicaReplicationEndpoint.java       | 28 +++++++--
 .../TestRegionReplicaReplicationEndpoint.java   | 63 +++++++++++++-------
 2 files changed, 65 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3d378b2a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index 235e27a..efbb8f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -323,7 +323,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
         EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool,
         int numWriters, int operationTimeout) {
       super(controller, entryBuffers, numWriters);
-      this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout);
+      this.sinkWriter =
+          new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors);
       this.tableDescriptors = tableDescriptors;
 
       // A cache for the table "memstore replication enabled" flag.
@@ -437,9 +438,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
     int operationTimeout;
     ExecutorService pool;
     Cache<TableName, Boolean> disabledAndDroppedTables;
+    TableDescriptors tableDescriptors;
 
     public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection,
-        ExecutorService pool, int operationTimeout) {
+        ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) {
       this.sink = sink;
       this.connection = connection;
       this.operationTimeout = operationTimeout;
@@ -447,6 +449,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
         = RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
       this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
       this.pool = pool;
+      this.tableDescriptors = tableDescriptors;
 
       int nonExistentTableCacheExpiryMs = connection.getConfiguration()
         .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
@@ -555,13 +558,14 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
       }
 
       boolean tasksCancelled = false;
-      for (Future<ReplicateWALEntryResponse> task : tasks) {
+      for (int replicaId = 0; replicaId < tasks.size(); replicaId++) {
         try {
-          task.get();
+          tasks.get(replicaId).get();
         } catch (InterruptedException e) {
           throw new InterruptedIOException(e.getMessage());
         } catch (ExecutionException e) {
           Throwable cause = e.getCause();
+          boolean canBeSkipped = false;
           if (cause instanceof IOException) {
             // The table can be disabled or dropped at this time. For disabled tables, we have no
             // cheap mechanism to detect this case because meta does not contain this information.
@@ -570,14 +574,26 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
             // check whether the table is dropped or disabled which might cause
             // SocketTimeoutException, or RetriesExhaustedException or similar if we get IOE.
             if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) {
+              disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
+              canBeSkipped = true;
+            } else if (tableDescriptors != null) {
+              HTableDescriptor tableDescriptor = tableDescriptors.get(tableName);
+              if (tableDescriptor != null
+                  // (replicaId + 1) as no task is added for primary replica for replication
+                  && tableDescriptor.getRegionReplication() <= (replicaId + 1)) {
+                canBeSkipped = true;
+              }
+            }
+            if (canBeSkipped) {
               if (LOG.isTraceEnabled()) {
                 LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
-                  + " because received exception for dropped or disabled table", cause);
+                    + " because received exception for dropped or disabled table",
+                  cause);
                 for (Entry entry : entries) {
                   LOG.trace("Skipping : " + entry);
                 }
               }
-              disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
+
               if (!tasksCancelled) {
                 sink.getSkippedEditsCounter().addAndGet(entries.size());
                 tasksCancelled = true; // so that we do not add to skipped counter again

http://git-wip-us.apache.org/repos/asf/hbase/blob/3d378b2a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
index 2c8119a..7e09969 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
@@ -18,41 +18,49 @@
 
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.RpcRetryingCaller;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.RpcRetryingCaller;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.log4j.Level;
 import org.junit.AfterClass;
@@ -60,8 +68,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.google.common.collect.Lists;
-
 /**
  * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
  * async wal replication replays the edits to the secondary region in various scenarios.
@@ -229,7 +235,7 @@ public class TestRegionReplicaReplicationEndpoint {
     for (int i = 1; i < regionReplication; i++) {
       final Region region = regions[i];
       // wait until all the data is replicated to all secondary regions
-      Waiter.waitFor(HTU.getConfiguration(), 90000, new Waiter.Predicate<Exception>() {
+      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
         @Override
         public boolean evaluate() throws Exception {
           LOG.info("verifying replication for region replica:" + region.getRegionInfo());
@@ -307,7 +313,6 @@ public class TestRegionReplicaReplicationEndpoint {
 
     Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
     Table table = connection.getTable(tableName);
-
     try {
       // load the data to the table
 
@@ -327,29 +332,35 @@ public class TestRegionReplicaReplicationEndpoint {
     }
   }
 
-  @Test (timeout = 240000)
+  @Test(timeout = 240000)
   public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
-    testRegionReplicaReplicationIgnoresDisabledTables(false);
+    testRegionReplicaReplicationIgnores(false, false);
   }
 
-  @Test (timeout = 240000)
+  @Test(timeout = 240000)
   public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
-    testRegionReplicaReplicationIgnoresDisabledTables(true);
+    testRegionReplicaReplicationIgnores(true, false);
   }
 
-  public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable)
+  @Test(timeout = 240000)
+  public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception {
+    testRegionReplicaReplicationIgnores(false, true);
+  }
+
+  public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
       throws Exception {
     // tests having edits from a disabled or dropped table is handled correctly by skipping those
     // entries and further edits after the edits from dropped/disabled table can be replicated
     // without problems.
     TableName tableName = TableName.valueOf("testRegionReplicaReplicationIgnoresDisabledTables"
-      + dropTable);
+        + "_drop_" + dropTable + "_disabledReplication_" + disableReplication);
     HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
     int regionReplication = 3;
     htd.setRegionReplication(regionReplication);
     HTU.deleteTableIfAny(tableName);
     HTU.getHBaseAdmin().createTable(htd);
-    TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable");
+    TableName toBeDisabledTable = TableName.valueOf(
+      dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable"));
     HTU.deleteTableIfAny(toBeDisabledTable);
     htd = HTU.createTableDescriptor(toBeDisabledTable.toString());
     htd.setRegionReplication(regionReplication);
@@ -371,14 +382,16 @@ public class TestRegionReplicaReplicationEndpoint {
     RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
         mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
     when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
+    FSTableDescriptors fstd = new FSTableDescriptors(HTU.getConfiguration(),
+      FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath());
     RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
         new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
           (ClusterConnection) connection,
-          Executors.newSingleThreadExecutor(), Integer.MAX_VALUE);
+          Executors.newSingleThreadExecutor(), Integer.MAX_VALUE, fstd);
+    
     RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
     HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
     byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
-
     Entry entry = new Entry(
       new WALKey(encodedRegionName, toBeDisabledTable, 1),
       new WALEdit());
@@ -386,13 +399,23 @@ public class TestRegionReplicaReplicationEndpoint {
     HTU.getHBaseAdmin().disableTable(toBeDisabledTable); // disable the table
     if (dropTable) {
       HTU.getHBaseAdmin().deleteTable(toBeDisabledTable);
+    } else if (disableReplication) {
+      htd.setRegionReplication(regionReplication - 2);
+      HTU.getHBaseAdmin().modifyTable(toBeDisabledTable, htd);
+      HTU.getHBaseAdmin().enableTable(toBeDisabledTable);
     }
 
     sinkWriter.append(toBeDisabledTable, encodedRegionName,
       HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
 
     assertEquals(2, skippedEdits.get());
-
+    if (disableReplication) {
+      // enable replication again so that we can verify replication
+      HTU.getHBaseAdmin().disableTable(toBeDisabledTable); // disable the table
+      htd.setRegionReplication(regionReplication);
+      HTU.getHBaseAdmin().modifyTable(toBeDisabledTable, htd);
+      HTU.getHBaseAdmin().enableTable(toBeDisabledTable);
+    }
     try {
       // load some data to the to-be-dropped table