You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/06/16 15:40:30 UTC

[hbase] 30/31: HBASE-22553 NPE in RegionReplicaReplicationEndpoint

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 2fa5fc85e30cbf803e31812e95094ceb40b16d35
Author: zhangduo <zh...@apache.org>
AuthorDate: Sat Jun 8 20:40:23 2019 +0800

    HBASE-22553 NPE in RegionReplicaReplicationEndpoint
---
 .../RegionReplicaReplicationEndpoint.java          | 51 +++++++++++++---------
 1 file changed, 30 insertions(+), 21 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index cc2650f..2c3b19b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -32,12 +32,15 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
@@ -162,9 +165,9 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
           return;
         }
         // check if the number of region replicas is correct, and also the primary region name
-        // matches, and also there is no null elements in the returned RegionLocations
+        // matches.
         if (locs.size() == tableDesc.getRegionReplication() &&
-          locs.size() == locs.numNonNullElements() &&
+          locs.getDefaultRegionLocation() != null &&
           Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
             encodedRegionName)) {
           future.complete(locs);
@@ -182,8 +185,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
       future.complete(Long.valueOf(entries.size()));
       return;
     }
-    if (!Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
-      encodedRegionName)) {
+    RegionInfo defaultReplica = locs.getDefaultRegionLocation().getRegion();
+    if (!Bytes.equals(defaultReplica.getEncodedNameAsBytes(), encodedRegionName)) {
       // the region name is not equal, this usually means the region has been split or merged, so
       // give up replicating as the new region(s) should already have all the data of the parent
       // region(s).
@@ -191,7 +194,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
         LOG.trace(
           "Skipping {} entries in table {} because located region {} is different than" +
             " the original region {} from WALEdit",
-          tableDesc.getTableName(), locs.getDefaultRegionLocation().getRegion().getEncodedName(),
+          tableDesc.getTableName(), defaultReplica.getEncodedName(),
           Bytes.toStringBinary(encodedRegionName));
       }
       future.complete(Long.valueOf(entries.size()));
@@ -202,24 +205,26 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
     AtomicLong skippedEdits = new AtomicLong(0);
 
     for (int i = 1, n = locs.size(); i < n; i++) {
-      final int replicaId = i;
-      FutureUtils.addListener(connection.replay(tableDesc.getTableName(),
-        locs.getRegionLocation(replicaId).getRegion().getEncodedNameAsBytes(), row, entries,
-        replicaId, numRetries, operationTimeoutNs), (r, e) -> {
-          if (e != null) {
-            LOG.warn("Failed to replicate to {}", locs.getRegionLocation(replicaId), e);
-            error.compareAndSet(null, e);
-          } else {
-            AtomicUtils.updateMax(skippedEdits, r.longValue());
-          }
-          if (remainingTasks.decrementAndGet() == 0) {
-            if (error.get() != null) {
-              future.completeExceptionally(error.get());
+      // Do not use the elements other than the default replica as they may be null. We will fail
+      // earlier if the location for default replica is null.
+      final RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(defaultReplica, i);
+      FutureUtils
+        .addListener(connection.replay(tableDesc.getTableName(), replica.getEncodedNameAsBytes(),
+          row, entries, replica.getReplicaId(), numRetries, operationTimeoutNs), (r, e) -> {
+            if (e != null) {
+              LOG.warn("Failed to replicate to {}", replica, e);
+              error.compareAndSet(null, e);
             } else {
-              future.complete(skippedEdits.get());
+              AtomicUtils.updateMax(skippedEdits, r.longValue());
             }
-          }
-        });
+            if (remainingTasks.decrementAndGet() == 0) {
+              if (error.get() != null) {
+                future.completeExceptionally(error.get());
+              } else {
+                future.complete(skippedEdits.get());
+              }
+            }
+          });
     }
   }
 
@@ -245,6 +250,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
     FutureUtils.addListener(locateFuture, (locs, error) -> {
       if (error != null) {
         future.completeExceptionally(error);
+      } else if (locs.getDefaultRegionLocation() == null) {
+        future.completeExceptionally(
+          new HBaseIOException("No location found for default replica of table=" +
+            tableDesc.getTableName() + " row='" + Bytes.toStringBinary(row) + "'"));
       } else {
         replicate(future, locs, tableDesc, encodedRegionName, row, entries);
       }