You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/09/22 19:07:46 UTC

[hbase] branch branch-2 updated: HBASE-25068 Pass WALFactory to Replication so it knows of all WALProviders, not just default/user-space

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new b5a242f  HBASE-25068 Pass WALFactory to Replication so it knows of all WALProviders, not just default/user-space
b5a242f is described below

commit b5a242f42a919e85a72406964db95d430dd6e881
Author: stack <st...@apache.org>
AuthorDate: Fri Sep 18 17:29:23 2020 -0700

    HBASE-25068 Pass WALFactory to Replication so it knows of all WALProviders, not just default/user-space
    
    Pass WALFactory to Replication instead of WALProvider. WALFactory has all
    WALProviders in it, not just the user-space WALProvider. Do this so
    ReplicationService has access to all WALProviders in the Server (To be
    exploited by the follow-on patch in HBASE-25055)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hadoop/hbase/regionserver/HRegionServer.java       | 18 ++++++++----------
 .../hadoop/hbase/regionserver/ReplicationService.java  | 11 ++++-------
 .../hbase/replication/regionserver/Replication.java    | 15 +++++++--------
 .../replication/regionserver/ReplicationSyncUp.java    |  6 ++++--
 .../hadoop/hbase/replication/TestReplicationBase.java  |  2 --
 .../regionserver/TestReplicationSourceManager.java     |  3 ++-
 6 files changed, 25 insertions(+), 30 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index a80881e..2a638e3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -245,7 +245,7 @@ public class HRegionServer extends Thread implements
   /**
    * For testing only!  Set to true to skip notifying region assignment to master .
    */
-  @VisibleForTesting
+  @SuppressWarnings("checkstyle:VisibilityModifier") @VisibleForTesting
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
   public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
 
@@ -1915,8 +1915,7 @@ public class HRegionServer extends Thread implements
       throw new IOException("Can not create wal directory " + logDir);
     }
     // Instantiate replication if replication enabled. Pass it the log directories.
-    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir,
-      factory.getWALProvider());
+    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
     this.walFactory = factory;
   }
 
@@ -3056,12 +3055,11 @@ public class HRegionServer extends Thread implements
    * Load the replication executorService objects, if any
    */
   private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
-      FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException {
+      FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
     if ((server instanceof HMaster) &&
       (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) {
       return;
     }
-
     // read in the name of the source replication class from the config file.
     String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
       HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
@@ -3074,19 +3072,19 @@ public class HRegionServer extends Thread implements
     // only one object.
     if (sourceClassname.equals(sinkClassname)) {
       server.replicationSourceHandler = newReplicationInstance(sourceClassname,
-        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
+        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
       server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
     } else {
       server.replicationSourceHandler = newReplicationInstance(sourceClassname,
-        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
+        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
       server.replicationSinkHandler = newReplicationInstance(sinkClassname,
-        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
+        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
     }
   }
 
   private static <T extends ReplicationService> T newReplicationInstance(String classname,
       Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
-      Path oldLogDir, WALProvider walProvider) throws IOException {
+      Path oldLogDir, WALFactory walFactory) throws IOException {
     final Class<? extends T> clazz;
     try {
       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
@@ -3095,7 +3093,7 @@ public class HRegionServer extends Thread implements
       throw new IOException("Could not find class for " + classname);
     }
     T service = ReflectionUtils.newInstance(clazz, conf);
-    service.initialize(server, walFs, logDir, oldLogDir, walProvider);
+    service.initialize(server, walFs, logDir, oldLogDir, walFactory);
     return service;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index e9bbaea..33b3321 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
-import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -32,14 +32,11 @@ import org.apache.yetus.audience.InterfaceAudience;
  */
 @InterfaceAudience.Private
 public interface ReplicationService {
-
   /**
    * Initializes the replication service object.
-   * @param walProvider can be null if not initialized inside a live region server environment, for
-   *          example, {@code ReplicationSyncUp}.
    */
-  void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALProvider walProvider)
-      throws IOException;
+  void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALFactory walFactory)
+    throws IOException;
 
   /**
    * Start replication services.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 10ddd0c..c636105 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -42,16 +42,16 @@ import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
+
 /**
  * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
  */
@@ -85,7 +85,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
 
   @Override
   public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
-      WALProvider walProvider) throws IOException {
+      WALFactory walFactory) throws IOException {
     this.server = server;
     this.conf = this.server.getConfiguration();
     this.isReplicationForBulkLoadDataEnabled =
@@ -123,8 +123,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
     }
     this.globalMetricsSource = CompatibilitySingletonFactory
         .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
-    this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf,
-      this.server, fs, logDir, oldLogDir, clusterId,
+    WALProvider walProvider = walFactory.getWALProvider();
+    this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
+        replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
         walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
         globalMetricsSource);
     if (walProvider != null) {
@@ -174,7 +175,6 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
    * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
    *          directory required for replicating hfiles
    * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
-   * @throws IOException
    */
   @Override
   public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
@@ -187,7 +187,6 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
   /**
    * If replication is enabled and this cluster is a master,
    * it starts
-   * @throws IOException
    */
   @Override
   public void startReplicationService() throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index 5e3a09f..af4720b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -82,7 +83,8 @@ public class ReplicationSyncUp extends Configured implements Tool {
 
       System.out.println("Start Replication Server start");
       Replication replication = new Replication();
-      replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null);
+      replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
+        new WALFactory(conf, "test"));
       ReplicationSourceManager manager = replication.getReplicationManager();
       manager.init().get();
       while (manager.activeFailoverTaskCount() > 0) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index e76c222..657edbd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,7 +20,6 @@ package org.apache.hadoop.hbase.replication;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 1e40457..94374f7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -182,7 +182,8 @@ public abstract class TestReplicationSourceManager {
     logDir = new Path(utility.getDataTestDir(),
         HConstants.HREGION_LOGDIR_NAME);
     replication = new Replication();
-    replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null);
+    replication.initialize(new DummyServer(), fs, logDir, oldLogDir,
+      new WALFactory(conf, "test"));
     managerOfCluster = getManagerFromCluster();
     if (managerOfCluster != null) {
       // After replication procedure, we need to add peer by hand (other than by receiving