You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/08/19 03:25:41 UTC

[GitHub] [hbase] Apache9 commented on a change in pull request #2255: HBASE-24877 Add option to avoid aborting RS process upon uncaught exc…

Apache9 commented on a change in pull request #2255:
URL: https://github.com/apache/hbase/pull/2255#discussion_r472636537



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -290,7 +290,22 @@ private boolean updateLogPosition(WALEntryBatch batch) {
   public void startup(UncaughtExceptionHandler handler) {
     String name = Thread.currentThread().getName();
     Threads.setDaemonThreadRunning(this,
-      name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler);
+      name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(),
+      (t,e) -> {

Review comment:
       OK, the code is almost the same... Then I think we could move the logic into uncaughtException method? If abortOnError is true, we about, otherwise we will try to refresh the source.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -373,7 +389,21 @@ private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> q
         Threads.setDaemonThreadRunning(
             walReader, Thread.currentThread().getName()
                 + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
-            this::uncaughtException);
+                (t,e) -> {

Review comment:
       So here it is for wal reader. I think refreshSources and retry is an acceptable way. Then let's just test the abortOnError flag here? If it is true, we will call uncaughtException, otherwise we will try to refresh the replication source.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -583,16 +617,27 @@ private void initialize() {
       PriorityBlockingQueue<Path> queue = entry.getValue();
       tryStartNewShipper(walGroupId, queue);
     }
+    this.startupOngoing.set(false);
   }
 
   @Override
   public void startup() {
     // mark we are running now
     this.sourceRunning = true;
-    initThread = new Thread(this::initialize);
-    Threads.setDaemonThreadRunning(initThread,
-      Thread.currentThread().getName() + ".replicationSource," + this.queueId,
-      this::uncaughtException);
+    this.retryStartup.set(true);

Review comment:
       This flag is only used in this method? Let's use a local var instead of a class member field?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -583,16 +617,27 @@ private void initialize() {
       PriorityBlockingQueue<Path> queue = entry.getValue();
       tryStartNewShipper(walGroupId, queue);
     }
+    this.startupOngoing.set(false);
   }
 
   @Override
   public void startup() {
     // mark we are running now
     this.sourceRunning = true;
-    initThread = new Thread(this::initialize);
-    Threads.setDaemonThreadRunning(initThread,
-      Thread.currentThread().getName() + ".replicationSource," + this.queueId,
-      this::uncaughtException);
+    this.retryStartup.set(true);
+    do {
+      if(retryStartup.get()) {
+        retryStartup.set(false);
+        startupOngoing.set(true);

Review comment:
       So this one is exactly the same with source.isActive? Can we just make use of that flag instead of introducing a new one?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org