You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/09/24 09:25:57 UTC

[hbase] branch master updated: HBASE-25086 Refactor Replication: move the default ReplicationSinkService implementation out (#2444)

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 8828643  HBASE-25086 Refactor Replication: move the default ReplicationSinkService implementation out (#2444)
8828643 is described below

commit 8828643bb229469e2480741793b156e8dba4122b
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Thu Sep 24 17:25:34 2020 +0800

    HBASE-25086 Refactor Replication: move the default ReplicationSinkService implementation out (#2444)
    
    Signed-off-by: meiyi <my...@gmail.com>
---
 .../java/org/apache/hadoop/hbase/HConstants.java   |   6 +-
 .../hadoop/hbase/regionserver/HRegionServer.java   |  43 +++++---
 .../replication/ReplicationSinkServiceImpl.java    | 115 +++++++++++++++++++++
 .../replication/regionserver/Replication.java      |  90 +++-------------
 .../replication/regionserver/ReplicationLoad.java  |  23 ++---
 5 files changed, 173 insertions(+), 104 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index f6f00c5..5b4b6fb 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -981,10 +981,12 @@ public final class HConstants {
    */
   public static final String
       REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service";
-  public static final String
-      REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service";
   public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT =
     "org.apache.hadoop.hbase.replication.regionserver.Replication";
+  public static final String
+      REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service";
+  public static final String REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT =
+    "org.apache.hadoop.hbase.replication.ReplicationSinkServiceImpl";
   public static final String REPLICATION_BULKLOAD_ENABLE_KEY = "hbase.replication.bulkload.enabled";
   public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
   /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index f14da2f..cd90fb8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -301,6 +301,7 @@ public class HRegionServer extends Thread implements
   // Replication services. If no replication, this handler will be null.
   private ReplicationSourceService replicationSourceHandler;
   private ReplicationSinkService replicationSinkHandler;
+  private boolean sameReplicationSourceAndSink;
 
   // Compactions
   public CompactSplit compactSplitThread;
@@ -1390,20 +1391,32 @@ public class HRegionServer extends Thread implements
         serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
       }
     }
-    // for the replicationLoad purpose. Only need to get from one executorService
-    // either source or sink will get the same info
-    ReplicationSourceService rsources = getReplicationSourceService();
 
-    if (rsources != null) {
+    if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
       // always refresh first to get the latest value
-      ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
+      ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
       if (rLoad != null) {
         serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
-        for (ClusterStatusProtos.ReplicationLoadSource rLS :
-            rLoad.getReplicationLoadSourceEntries()) {
+        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
+          .getReplicationLoadSourceEntries()) {
           serverLoad.addReplLoadSource(rLS);
         }
-
+      }
+    } else {
+      if (replicationSourceHandler != null) {
+        ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
+        if (rLoad != null) {
+          for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
+            .getReplicationLoadSourceEntries()) {
+            serverLoad.addReplLoadSource(rLS);
+          }
+        }
+      }
+      if (replicationSinkHandler != null) {
+        ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
+        if (rLoad != null) {
+          serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
+        }
       }
     }
 
@@ -1921,8 +1934,7 @@ public class HRegionServer extends Thread implements
    * Start up replication source and sink handlers.
    */
   private void startReplicationService() throws IOException {
-    if (this.replicationSourceHandler == this.replicationSinkHandler &&
-        this.replicationSourceHandler != null) {
+    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
       this.replicationSourceHandler.startReplicationService();
     } else {
       if (this.replicationSourceHandler != null) {
@@ -2628,9 +2640,10 @@ public class HRegionServer extends Thread implements
     if (this.compactSplitThread != null) {
       this.compactSplitThread.join();
     }
-    if (this.executorService != null) this.executorService.shutdown();
-    if (this.replicationSourceHandler != null &&
-        this.replicationSourceHandler == this.replicationSinkHandler) {
+    if (this.executorService != null) {
+      this.executorService.shutdown();
+    }
+    if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
       this.replicationSourceHandler.stopReplicationService();
     } else {
       if (this.replicationSourceHandler != null) {
@@ -3070,7 +3083,7 @@ public class HRegionServer extends Thread implements
 
     // read in the name of the sink replication class from the config file.
     String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
-      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+      HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
 
     // If both the sink and the source class names are the same, then instantiate
     // only one object.
@@ -3078,11 +3091,13 @@ public class HRegionServer extends Thread implements
       server.replicationSourceHandler = newReplicationInstance(sourceClassname,
         ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
       server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
+      server.sameReplicationSourceAndSink = true;
     } else {
       server.replicationSourceHandler = newReplicationInstance(sourceClassname,
         ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
       server.replicationSinkHandler = newReplicationInstance(sinkClassname,
         ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
+      server.sameReplicationSourceAndSink = false;
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java
new file mode 100644
index 0000000..9b0e3f7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+
+@InterfaceAudience.Private
+public class ReplicationSinkServiceImpl implements ReplicationSinkService {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkServiceImpl.class);
+
+  private Configuration conf;
+
+  private Server server;
+
+  private ReplicationSink replicationSink;
+
+  // ReplicationLoad to access replication metrics
+  private ReplicationLoad replicationLoad;
+
+  private int statsPeriod;
+
+  @Override
+  public void replicateLogEntries(List<AdminProtos.WALEntry> entries, CellScanner cells,
+    String replicationClusterId, String sourceBaseNamespaceDirPath,
+    String sourceHFileArchiveDirPath) throws IOException {
+    this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
+      sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
+  }
+
+  @Override
+  public void initialize(Server server, FileSystem fs, Path logdir, Path oldLogDir,
+    WALProvider walProvider) throws IOException {
+    this.server = server;
+    this.conf = server.getConfiguration();
+    this.statsPeriod =
+      this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
+    this.replicationLoad = new ReplicationLoad();
+  }
+
+  @Override
+  public void startReplicationService() throws IOException {
+    this.replicationSink = new ReplicationSink(this.conf);
+    this.server.getChoreService().scheduleChore(
+      new ReplicationStatisticsChore("ReplicationSinkStatistics", server, statsPeriod));
+  }
+
+  @Override
+  public void stopReplicationService() {
+    if (this.replicationSink != null) {
+      this.replicationSink.stopReplicationSinkServices();
+    }
+  }
+
+  @Override
+  public ReplicationLoad refreshAndGetReplicationLoad() {
+    if (replicationLoad == null) {
+      return null;
+    }
+    // always build for latest data
+    replicationLoad.buildReplicationLoad(Collections.emptyList(), replicationSink.getSinkMetrics());
+    return replicationLoad;
+  }
+
+  private final class ReplicationStatisticsChore extends ScheduledChore {
+
+    ReplicationStatisticsChore(String name, Stoppable stopper, int period) {
+      super(name, stopper, period);
+    }
+
+    @Override
+    protected void chore() {
+      printStats(replicationSink.getStats());
+    }
+
+    private void printStats(String stats) {
+      if (!stats.isEmpty()) {
+        LOG.info(stats);
+      }
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 195877b..33975ed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -22,18 +22,15 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.OptionalLong;
 import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
@@ -51,15 +48,11 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
-
 /**
  * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
  */
 @InterfaceAudience.Private
-public class Replication implements ReplicationSourceService, ReplicationSinkService {
+public class Replication implements ReplicationSourceService {
   private static final Logger LOG =
       LoggerFactory.getLogger(Replication.class);
   private boolean isReplicationForBulkLoadDataEnabled;
@@ -68,13 +61,10 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
   private ReplicationPeers replicationPeers;
   private ReplicationTracker replicationTracker;
   private Configuration conf;
-  private ReplicationSink replicationSink;
   private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider;
   // Hosting server
   private Server server;
-  /** Statistics thread schedule pool */
-  private ScheduledExecutorService scheduleThreadPool;
-  private int statsThreadPeriod;
+  private int statsPeriod;
   // ReplicationLoad to access replication metrics
   private ReplicationLoad replicationLoad;
   private MetricsReplicationGlobalSourceSource globalMetricsSource;
@@ -94,11 +84,6 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
     this.conf = this.server.getConfiguration();
     this.isReplicationForBulkLoadDataEnabled =
       ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf);
-    this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
-      new ThreadFactoryBuilder()
-        .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
-        .setDaemon(true)
-        .build());
     if (this.isReplicationForBulkLoadDataEnabled) {
       if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
           || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
@@ -154,9 +139,8 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
               p.getSyncReplicationState(), p.getNewSyncReplicationState(), 0));
       }
     }
-    this.statsThreadPeriod =
+    this.statsPeriod =
         this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
-    LOG.debug("Replication stats-in-log period={} seconds",  this.statsThreadPeriod);
     this.replicationLoad = new ReplicationLoad();
 
     this.peerProcedureHandler =
@@ -173,39 +157,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
    */
   @Override
   public void stopReplicationService() {
-    join();
-  }
-
-  /**
-   * Join with the replication threads
-   */
-  public void join() {
     this.replicationManager.join();
-    if (this.replicationSink != null) {
-      this.replicationSink.stopReplicationSinkServices();
-    }
-    scheduleThreadPool.shutdown();
-  }
-
-  /**
-   * Carry on the list of log entries down to the sink
-   * @param entries list of entries to replicate
-   * @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not
-   *          contain the Cells we are replicating; they are passed here on the side in this
-   *          CellScanner).
-   * @param replicationClusterId Id which will uniquely identify source cluster FS client
-   *          configurations in the replication configuration directory
-   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
-   *          directory required for replicating hfiles
-   * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
-   * @throws IOException
-   */
-  @Override
-  public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
-      String replicationClusterId, String sourceBaseNamespaceDirPath,
-      String sourceHFileArchiveDirPath) throws IOException {
-    this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
-      sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
   }
 
   /**
@@ -216,10 +168,8 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
   @Override
   public void startReplicationService() throws IOException {
     this.replicationManager.init();
-    this.replicationSink = new ReplicationSink(this.conf);
-    this.scheduleThreadPool.scheduleAtFixedRate(
-      new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
-      statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
+    this.server.getChoreService().scheduleChore(
+      new ReplicationStatisticsChore("ReplicationSourceStatistics", server, statsPeriod));
     LOG.info("{} started", this.server.toString());
   }
 
@@ -244,21 +194,15 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
   /**
    * Statistics task. Periodically prints the cache statistics to the log.
    */
-  private final static class ReplicationStatisticsTask implements Runnable {
-
-    private final ReplicationSink replicationSink;
-    private final ReplicationSourceManager replicationManager;
+  private final class ReplicationStatisticsChore extends ScheduledChore {
 
-    public ReplicationStatisticsTask(ReplicationSink replicationSink,
-        ReplicationSourceManager replicationManager) {
-      this.replicationManager = replicationManager;
-      this.replicationSink = replicationSink;
+    ReplicationStatisticsChore(String name, Stoppable stopper, int period) {
+      super(name, stopper, period);
     }
 
     @Override
-    public void run() {
-      printStats(this.replicationManager.getStats());
-      printStats(this.replicationSink.getStats());
+    protected void chore() {
+      printStats(replicationManager.getStats());
     }
 
     private void printStats(String stats) {
@@ -274,17 +218,11 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
       return null;
     }
     // always build for latest data
-    buildReplicationLoad();
-    return this.replicationLoad;
-  }
-
-  private void buildReplicationLoad() {
     List<ReplicationSourceInterface> allSources = new ArrayList<>();
     allSources.addAll(this.replicationManager.getSources());
     allSources.addAll(this.replicationManager.getOldSources());
-    // get sink
-    MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
-    this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics);
+    this.replicationLoad.buildReplicationLoad(allSources, null);
+    return this.replicationLoad;
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
index e011e0a..6fb21dc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
@@ -36,7 +36,6 @@ public class ReplicationLoad {
 
   // Empty load instance.
   public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad();
-  private MetricsSink sinkMetrics;
 
   private List<ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceEntries;
   private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink;
@@ -49,21 +48,22 @@ public class ReplicationLoad {
   /**
    * buildReplicationLoad
    * @param sources List of ReplicationSource instances for which metrics should be reported
-   * @param skMetrics
+   * @param sinkMetrics metrics of the replication sink
    */
 
   public void buildReplicationLoad(final List<ReplicationSourceInterface> sources,
-      final MetricsSink skMetrics) {
-    this.sinkMetrics = skMetrics;
+      final MetricsSink sinkMetrics) {
 
-    // build the SinkLoad
-    ClusterStatusProtos.ReplicationLoadSink.Builder rLoadSinkBuild =
+    if (sinkMetrics != null) {
+      // build the SinkLoad
+      ClusterStatusProtos.ReplicationLoadSink.Builder rLoadSinkBuild =
         ClusterStatusProtos.ReplicationLoadSink.newBuilder();
-    rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
-    rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
-    rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp());
-    rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps());
-    this.replicationLoadSink = rLoadSinkBuild.build();
+      rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
+      rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
+      rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp());
+      rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps());
+      this.replicationLoadSink = rLoadSinkBuild.build();
+    }
 
     this.replicationLoadSourceEntries = new ArrayList<>();
     for (ReplicationSourceInterface source : sources) {
@@ -157,5 +157,4 @@ public class ReplicationLoad {
   public String toString() {
     return this.sourceToString() + System.getProperty("line.separator") + this.sinkToString();
   }
-
 }