You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/09/23 04:02:08 UTC

[hbase] branch HBASE-18070 created (now 03e8e5b)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a change to branch HBASE-18070
in repository https://gitbox.apache.org/repos/asf/hbase.git.


      at 03e8e5b  HBASE-25068 Pass WALFactory to Replication so it knows of all WALProviders, not just default/user-space

This branch includes the following new commits:

     new 03e8e5b  HBASE-25068 Pass WALFactory to Replication so it knows of all WALProviders, not just default/user-space

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[hbase] 01/01: HBASE-25068 Pass WALFactory to Replication so it knows of all WALProviders, not just default/user-space

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch HBASE-18070
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 03e8e5b517870f8dcd2adeb6e79bc4cff8111c75
Author: stack <st...@apache.org>
AuthorDate: Fri Sep 18 17:29:23 2020 -0700

    HBASE-25068 Pass WALFactory to Replication so it knows of all WALProviders, not just default/user-space
    
    Pass WALFactory to Replication instead of WALProvider. WALFactory has all
    WALProviders in it, not just the user-space WALProvider. Do this so
    ReplicationService has access to all WALProviders in the Server (To be
    exploited by the follow-on patch in HBASE-25055)
---
 .../apache/hadoop/hbase/regionserver/HRegionServer.java   | 15 +++++++--------
 .../hadoop/hbase/regionserver/ReplicationService.java     | 11 ++++-------
 .../hbase/replication/regionserver/Replication.java       |  8 ++++----
 .../hbase/replication/regionserver/ReplicationSyncUp.java |  6 ++++--
 .../hadoop/hbase/replication/TestReplicationBase.java     |  2 +-
 .../regionserver/TestReplicationSourceManager.java        |  3 ++-
 6 files changed, 22 insertions(+), 23 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index f14da2f..8abede5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1911,8 +1911,7 @@ public class HRegionServer extends Thread implements
         throw new IOException("Can not create wal directory " + logDir);
       }
       // Instantiate replication if replication enabled. Pass it the log directories.
-      createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir,
-        factory.getWALProvider());
+      createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
     }
     this.walFactory = factory;
   }
@@ -3063,7 +3062,7 @@ public class HRegionServer extends Thread implements
    * Load the replication executorService objects, if any
    */
   private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
-      FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException {
+      FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
     // read in the name of the source replication class from the config file.
     String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
       HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
@@ -3076,19 +3075,19 @@ public class HRegionServer extends Thread implements
     // only one object.
     if (sourceClassname.equals(sinkClassname)) {
       server.replicationSourceHandler = newReplicationInstance(sourceClassname,
-        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
+        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
       server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
     } else {
       server.replicationSourceHandler = newReplicationInstance(sourceClassname,
-        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
+        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
       server.replicationSinkHandler = newReplicationInstance(sinkClassname,
-        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
+        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
     }
   }
 
   private static <T extends ReplicationService> T newReplicationInstance(String classname,
       Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
-      Path oldLogDir, WALProvider walProvider) throws IOException {
+      Path oldLogDir, WALFactory walFactory) throws IOException {
     final Class<? extends T> clazz;
     try {
       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
@@ -3097,7 +3096,7 @@ public class HRegionServer extends Thread implements
       throw new IOException("Could not find class for " + classname);
     }
     T service = ReflectionUtils.newInstance(clazz, conf);
-    service.initialize(server, walFs, logDir, oldLogDir, walProvider);
+    service.initialize(server, walFs, logDir, oldLogDir, walFactory);
     return service;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index e9bbaea..33b3321 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
-import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -32,14 +32,11 @@ import org.apache.yetus.audience.InterfaceAudience;
  */
 @InterfaceAudience.Private
 public interface ReplicationService {
-
   /**
    * Initializes the replication service object.
-   * @param walProvider can be null if not initialized inside a live region server environment, for
-   *          example, {@code ReplicationSyncUp}.
    */
-  void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALProvider walProvider)
-      throws IOException;
+  void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALFactory walFactory)
+    throws IOException;
 
   /**
    * Start replication services.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 195877b..d8a696c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -89,7 +90,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
 
   @Override
   public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
-      WALProvider walProvider) throws IOException {
+      WALFactory walFactory) throws IOException {
     this.server = server;
     this.conf = this.server.getConfiguration();
     this.isReplicationForBulkLoadDataEnabled =
@@ -128,6 +129,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
     SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
     this.globalMetricsSource = CompatibilitySingletonFactory
         .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
+    WALProvider walProvider = walFactory.getWALProvider();
     this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
         replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
         walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
@@ -198,7 +200,6 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
    * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
    *          directory required for replicating hfiles
    * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
-   * @throws IOException
    */
   @Override
   public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
@@ -211,7 +212,6 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
   /**
    * If replication is enabled and this cluster is a master,
    * it starts
-   * @throws IOException
    */
   @Override
   public void startReplicationService() throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index 98490f1..b04c7eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -82,7 +83,8 @@ public class ReplicationSyncUp extends Configured implements Tool {
 
       System.out.println("Start Replication Server start");
       Replication replication = new Replication();
-      replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null);
+      replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
+        new WALFactory(conf, "test", false));
       ReplicationSourceManager manager = replication.getReplicationManager();
       manager.init().get();
       while (manager.activeFailoverTaskCount() > 0) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 6e1692a..455b272 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 8e38114..4abb00f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -194,7 +194,8 @@ public abstract class TestReplicationSourceManager {
     logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME);
     remoteLogDir = utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME);
     replication = new Replication();
-    replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null);
+    replication.initialize(new DummyServer(), fs, logDir, oldLogDir,
+      new WALFactory(conf, "test", false));
     managerOfCluster = getManagerFromCluster();
     if (managerOfCluster != null) {
       // After replication procedure, we need to add peer by hand (other than by receiving