You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/08/24 15:05:35 UTC

hbase git commit: HBASE-15982 Interface ReplicationEndpoint extends Guava's Service

Repository: hbase
Updated Branches:
  refs/heads/master d12eb7a4a -> 6e7baa07f


HBASE-15982 Interface ReplicationEndpoint extends Guava's Service

Breaking change to our ReplicationEndpoint and BaseReplicationEndpoint.

ReplicationEndpoint implemented Guava 0.12 Service. An abstract
subclass, BaseReplicationEndpoint, provided default implementations
and facility, among other things, by extending Guava
AbstractService class.

Both of these HBase classes were marked LimitedPrivate for
REPLICATION so these classes were semi-public and made it so
Guava 0.12 was part of our API.

Having Guava in our API was a mistake. It anchors us and the
implementation of the Interface to Guava 0.12. This is untenable
given Guava changes and that the Service Interface in particular
has had extensive revamp and improvement done. We can't hold to
the Guava Interface. It changed. We can't stay on Guava 0.12;
implementors and others on our CLASSPATH won't abide being stuck
on an old Guava.

So this class makes breaking changes. The unhitching of our Interface
from Guava could only be done in a breaking manner. It undoes the
LimitedPrivate on BaseReplicationEndpoint while keeping it for the RE
Interface. It means consumers will have to copy/paste the
AbstractService-based BRE into their own codebase also supplying their
own Guava; HBase no longer 'supplies' this (our Guava usage has
been internalized, relocated).

This patch then adds into RE the basic methods RE needs of the old
Guava Service rather than return a Service to start/stop only to go
back to the RE instance to do actual work. A few method names had to
be changed so could make implementations with Guava Service internally
and not have RE method names and types clash). Semantics remained the
same otherwise. For example startAsync and stopAsync in Guava are start
and stop in RE.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6e7baa07
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6e7baa07
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6e7baa07

Branch: refs/heads/master
Commit: 6e7baa07f0b1f5841379545acaf23d36f50de2c2
Parents: d12eb7a
Author: Michael Stack <st...@apache.org>
Authored: Tue Aug 8 21:55:47 2017 +0800
Committer: Michael Stack <st...@apache.org>
Committed: Thu Aug 24 08:05:27 2017 -0700

----------------------------------------------------------------------
 .../replication/BaseReplicationEndpoint.java    | 16 ++--
 .../replication/HBaseReplicationEndpoint.java   | 10 +++
 .../hbase/replication/ReplicationEndpoint.java  | 88 +++++++++++++++++++-
 .../regionserver/ReplicationSource.java         | 38 ++++-----
 .../VisibilityReplicationEndpoint.java          | 40 ++++-----
 .../TestReplicationAdminWithClusters.java       | 10 +++
 .../replication/TestReplicationEndpoint.java    | 10 +++
 .../replication/TestReplicationSource.java      |  2 +-
 8 files changed, 160 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6e7baa07/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
index ae4e7cc..5b9cef7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
@@ -24,15 +24,16 @@ import java.util.ArrayList;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.AbstractService;
+
 /**
- * A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this
- * class rather than implementing {@link ReplicationEndpoint} directly for better backwards
- * compatibility.
+ * A Base implementation for {@link ReplicationEndpoint}s. For internal use. Uses our internal
+ * Guava.
  */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+// This class has been made InterfaceAudience.Private in 2.0.0. It used to be
+// LimitedPrivate. See HBASE-15982.
+@InterfaceAudience.Private
 public abstract class BaseReplicationEndpoint extends AbstractService
   implements ReplicationEndpoint {
 
@@ -109,4 +110,9 @@ public abstract class BaseReplicationEndpoint extends AbstractService
   public boolean canReplicateToSameCluster() {
     return false;
   }
+
+  @Override
+  public boolean isStarting() {
+    return state() == State.STARTING;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e7baa07/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index 1bc18a9..42667e1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -79,6 +79,16 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
   }
 
   @Override
+  public void start() {
+    startAsync();
+  }
+
+  @Override
+  public void stop() {
+    stopAsync();
+  }
+
+  @Override
   protected void doStart() {
     try {
       reloadZkWatcher();

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e7baa07/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index 6bf696b..f23276c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.replication;
 import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -31,8 +33,6 @@ import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service;
-
 /**
  * ReplicationEndpoint is a plugin which implements replication
  * to other HBase clusters, or other systems. ReplicationEndpoint implementation
@@ -47,7 +47,9 @@ import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service;
  * and persisting of the WAL entries in the other cluster.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListener {
+public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
+  // TODO: This class needs doc. Has a Context and a ReplicationContext. Then has #start, #stop.
+  // How they relate? Do we #start before #init(Context)? We fail fast if you don't?
 
   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
   class Context {
@@ -176,4 +178,82 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
    * parameters can be obtained.
    */
   boolean replicate(ReplicateContext replicateContext);
-}
+
+
+  // The below methods are inspired by Guava Service. See
+  // https://github.com/google/guava/wiki/ServiceExplained for overview of Guava Service.
+  // Below we implement a subset only with different names on some methods so we can implement
+  // the below internally using Guava (without exposing our implementation to
+  // ReplicationEndpoint implementors.
+
+  /**
+   * Returns {@code true} if this service is RUNNING.
+   */
+  boolean isRunning();
+
+  /**
+   * @return Return {@code true} is this service is STARTING (but not yet RUNNING).
+   */
+  boolean isStarting();
+
+  /**
+   * Initiates service startup and returns immediately. A stopped service may not be restarted.
+   * Equivalent of startAsync call in Guava Service.
+   * @throws IllegalStateException if the service is not new, if it has been run already.
+   */
+  void start();
+
+  /**
+   * Waits for the {@link ReplicationEndpoint} to be up and running.
+   *
+   * @throws IllegalStateException if the service reaches a state from which it is not possible to
+   *     enter the (internal) running state. e.g. if the state is terminated when this method is
+   *     called then this will throw an IllegalStateException.
+   */
+  void awaitRunning();
+
+  /**
+   * Waits for the {@link ReplicationEndpoint} to to be up and running for no more
+   * than the given time.
+   *
+   * @param timeout the maximum time to wait
+   * @param unit the time unit of the timeout argument
+   * @throws TimeoutException if the service has not reached the given state within the deadline
+   * @throws IllegalStateException if the service reaches a state from which it is not possible to
+   *     enter the (internal) running state. e.g. if the state is terminated when this method is
+   *     called then this will throw an IllegalStateException.
+   */
+  void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException;
+
+  /**
+   * If the service is starting or running, this initiates service shutdown and returns immediately.
+   * If the service has already been stopped, this method returns immediately without taking action.
+   * Equivalent of stopAsync call in Guava Service.
+   */
+  void stop();
+
+  /**
+   * Waits for the {@link ReplicationEndpoint} to reach the terminated (internal) state.
+   *
+   * @throws IllegalStateException if the service FAILED.
+   */
+  void awaitTerminated();
+
+  /**
+   * Waits for the {@link ReplicationEndpoint} to reach a terminal state for no
+   * more than the given time.
+   *
+   * @param timeout the maximum time to wait
+   * @param unit the time unit of the timeout argument
+   * @throws TimeoutException if the service has not reached the given state within the deadline
+   * @throws IllegalStateException if the service FAILED.
+   */
+  void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException;
+
+  /**
+   * Returns the {@link Throwable} that caused this service to fail.
+   *
+   * @throws IllegalStateException if this service's state isn't FAILED.
+   */
+  Throwable failureCause();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e7baa07/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 1d3e4fb..f3a37dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -19,8 +19,6 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ListenableFuture;
-import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -130,6 +128,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
 
   private AtomicLong totalBufferUsed;
 
+  public static final String WAIT_ON_ENDPOINT_SECONDS =
+    "hbase.replication.wait.on.endpoint.seconds";
+  public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
+  private int waitOnEndpointSeconds = -1;
+
   /**
    * Instantiation method used by region servers
    *
@@ -152,6 +155,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
           throws IOException {
     this.stopper = stopper;
     this.conf = HBaseConfiguration.create(conf);
+    this.waitOnEndpointSeconds =
+      this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
     decorateConf();
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
@@ -245,17 +250,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     this.sourceRunning = true;
     try {
       // start the endpoint, connect to the cluster
-      Service service = replicationEndpoint.startAsync();
-      final int waitTime = 10;
-      service.awaitRunning(waitTime, TimeUnit.SECONDS);
-      if (!service.isRunning()) {
-        LOG.warn("ReplicationEndpoint was not started after waiting " + waitTime +
-          " + seconds. Exiting");
-        uninitialize();
-        return;
-      }
+      this.replicationEndpoint.start();
+      this.replicationEndpoint.awaitRunning(this.waitOnEndpointSeconds, TimeUnit.SECONDS);
     } catch (Exception ex) {
       LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
+      uninitialize();
       throw new RuntimeException(ex);
     }
 
@@ -383,14 +382,12 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   private void uninitialize() {
     LOG.debug("Source exiting " + this.peerId);
     metrics.clear();
-    if (replicationEndpoint.state() == Service.State.STARTING
-        || replicationEndpoint.state() == Service.State.RUNNING) {
-      replicationEndpoint.stopAsync();
-      final int waitTime = 10;
+    if (this.replicationEndpoint.isRunning() || this.replicationEndpoint.isStarting()) {
+      this.replicationEndpoint.stop();
       try {
-        replicationEndpoint.awaitTerminated(waitTime, TimeUnit.SECONDS);
+        this.replicationEndpoint.awaitTerminated(this.waitOnEndpointSeconds, TimeUnit.SECONDS);
       } catch (TimeoutException e) {
-        LOG.warn("Failed termination after " + waitTime + " seconds.");
+        LOG.warn("Failed termination after " + this.waitOnEndpointSeconds + " seconds.");
       }
     }
   }
@@ -463,18 +460,17 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
       worker.entryReader.interrupt();
       worker.interrupt();
     }
-    Service service = null;
     if (this.replicationEndpoint != null) {
-      service = this.replicationEndpoint.stopAsync();
+      this.replicationEndpoint.stop();
     }
     if (join) {
       for (ReplicationSourceShipper worker : workers) {
         Threads.shutdown(worker, this.sleepForRetries);
         LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
       }
-      if (service != null) {
+      if (this.replicationEndpoint != null) {
         try {
-          service.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
+          this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
         } catch (TimeoutException te) {
           LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
               + this.peerClusterZnode,

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e7baa07/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
index 51655a1..1ce2b3d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
@@ -46,8 +46,9 @@ import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service;
 public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
 
   private static final Log LOG = LogFactory.getLog(VisibilityReplicationEndpoint.class);
-  private ReplicationEndpoint delegator;
-  private VisibilityLabelService visibilityLabelsService;
+
+  private final ReplicationEndpoint delegator;
+  private final VisibilityLabelService visibilityLabelsService;
 
   public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint,
       VisibilityLabelService visibilityLabelsService) {
@@ -62,7 +63,7 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
 
   @Override
   public void peerConfigUpdated(ReplicationPeerConfig rpc){
-
+    delegator.peerConfigUpdated(rpc);
   }
 
   @Override
@@ -138,23 +139,16 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
   }
 
   @Override
-  public Service startAsync() {
-    return this.delegator.startAsync();
-  }
-
-  @Override
   public boolean isRunning() {
-    return delegator.isRunning();
+    return this.delegator.isRunning();
   }
 
   @Override
-  public State state() {
-    return delegator.state();
-  }
+  public boolean isStarting() {return this.delegator.isStarting();}
 
   @Override
-  public Service stopAsync() {
-    return this.delegator.stopAsync();
+  public void start() {
+    this.delegator.start();
   }
 
   @Override
@@ -163,8 +157,13 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
   }
 
   @Override
-  public void awaitRunning(long l, TimeUnit timeUnit) throws TimeoutException {
-    this.delegator.awaitRunning(l, timeUnit);
+  public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
+    this.delegator.awaitRunning(timeout, unit);
+  }
+
+  @Override
+  public void stop() {
+    this.delegator.stop();
   }
 
   @Override
@@ -173,17 +172,12 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
   }
 
   @Override
-  public void awaitTerminated(long l, TimeUnit timeUnit) throws TimeoutException {
-    this.delegator.awaitTerminated(l, timeUnit);
+  public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
+    this.delegator.awaitTerminated(timeout, unit);
   }
 
   @Override
   public Throwable failureCause() {
     return this.delegator.failureCause();
   }
-
-  @Override
-  public void addListener(Listener listener, Executor executor) {
-    this.delegator.addListener(listener, executor);
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e7baa07/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
index 3b5522b..2610313 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
@@ -250,6 +250,16 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
     }
 
     @Override
+    public void start() {
+      startAsync();
+    }
+
+    @Override
+    public void stop() {
+      stopAsync();
+    }
+
+    @Override
     protected void doStart() {
       notifyStarted();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e7baa07/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index c63a69b..a0562bf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -396,6 +396,16 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     }
 
     @Override
+    public void start() {
+      startAsync();
+    }
+
+    @Override
+    public void stop() {
+      stopAsync();
+    }
+
+    @Override
     protected void doStart() {
       startedCount.incrementAndGet();
       notifyStarted();

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e7baa07/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index 7ea698c..c3b7eaf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -158,7 +158,7 @@ public class TestReplicationSource {
         // completes
       }
     };
-    replicationEndpoint.startAsync();
+    replicationEndpoint.start();
     ReplicationPeers mockPeers = Mockito.mock(ReplicationPeers.class);
     ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
     Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);