You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2020/06/03 08:34:13 UTC

[hbase] branch branch-2 updated: HBASE-21406 "status 'replication'" should not show SINK if the cluste… (#1761)

This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 11d093b  HBASE-21406 "status 'replication'" should not show SINK if the cluste… (#1761)
11d093b is described below

commit 11d093bc399662c31fb31083f18f95f6460725df
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Tue Jun 2 09:27:16 2020 +0100

    HBASE-21406 "status 'replication'" should not show SINK if the cluste… (#1761)
    
    Signed-off-by: Jan Hentschel <ja...@ultratendency.com>
    Signed-off by: Viraj Jasani <vj...@apache.org>
    Signed-off-by: Josh Elser <el...@apache.org>
    
    (Cherry picked from commit e5345b3a7c32c6a80394319c17540b84c8fe66ba)
---
 .../hbase/replication/ReplicationLoadSink.java     | 15 +++++++-
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |  7 +++-
 .../regionserver/MetricsReplicationSinkSource.java |  1 +
 .../MetricsReplicationSinkSourceImpl.java          |  4 ++
 .../src/main/protobuf/ClusterStatus.proto          |  2 +
 .../replication/regionserver/MetricsSink.java      | 18 +++++++++
 .../replication/regionserver/ReplicationLoad.java  |  2 +
 .../hbase/replication/TestReplicationStatus.java   | 45 +++++++++++++++++++---
 hbase-shell/src/main/ruby/hbase/admin.rb           | 20 ++++++----
 9 files changed, 99 insertions(+), 15 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java
index d9698ca..f9afddd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java
@@ -19,12 +19,17 @@ import org.apache.yetus.audience.InterfaceAudience;
 public class ReplicationLoadSink {
   private final long ageOfLastAppliedOp;
   private final long timestampsOfLastAppliedOp;
+  private final long timestampStarted;
+  private final long totalOpsProcessed;
 
   // TODO: add the builder for this class
   @InterfaceAudience.Private
-  public ReplicationLoadSink(long age, long timestamp) {
+  public ReplicationLoadSink(long age, long timestamp, long timestampStarted,
+      long totalOpsProcessed) {
     this.ageOfLastAppliedOp = age;
     this.timestampsOfLastAppliedOp = timestamp;
+    this.timestampStarted = timestampStarted;
+    this.totalOpsProcessed = totalOpsProcessed;
   }
 
   public long getAgeOfLastAppliedOp() {
@@ -43,4 +48,12 @@ public class ReplicationLoadSink {
   public long getTimestampsOfLastAppliedOp() {
     return this.timestampsOfLastAppliedOp;
   }
+
+  public long getTimestampStarted() {
+    return timestampStarted;
+  }
+
+  public long getTotalOpsProcessed() {
+    return totalOpsProcessed;
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 1c3a217..563ec95 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -2801,7 +2801,10 @@ public final class ProtobufUtil {
 
   public static ReplicationLoadSink toReplicationLoadSink(
       ClusterStatusProtos.ReplicationLoadSink rls) {
-    return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(), rls.getTimeStampsOfLastAppliedOp());
+    return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(),
+        rls.getTimeStampsOfLastAppliedOp(),
+        rls.getTimestampStarted(),
+        rls.getTotalOpsProcessed());
   }
 
   public static ReplicationLoadSource toReplicationLoadSource(
@@ -3394,6 +3397,8 @@ public final class ProtobufUtil {
     return ClusterStatusProtos.ReplicationLoadSink.newBuilder()
         .setAgeOfLastAppliedOp(rls.getAgeOfLastAppliedOp())
         .setTimeStampsOfLastAppliedOp(rls.getTimestampsOfLastAppliedOp())
+        .setTimestampStarted(rls.getTimestampStarted())
+        .setTotalOpsProcessed(rls.getTotalOpsProcessed())
         .build();
   }
 
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
index 1d6251b..2498e34 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
@@ -32,4 +32,5 @@ public interface MetricsReplicationSinkSource {
   void incrAppliedOps(long batchsize);
   long getLastAppliedOpAge();
   void incrAppliedHFiles(long hfileSize);
+  long getSinkAppliedOps();
 }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
index 485764e..18addb0 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
@@ -58,4 +58,8 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
   public void incrAppliedHFiles(long hfiles) {
     hfilesCounter.incr(hfiles);
   }
+
+  @Override public long getSinkAppliedOps() {
+    return opsCounter.value();
+  }
 }
diff --git a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
index 65c2268..d708674 100644
--- a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
@@ -185,6 +185,8 @@ message ClientMetrics {
 message ReplicationLoadSink {
   required uint64 ageOfLastAppliedOp = 1;
   required uint64 timeStampsOfLastAppliedOp = 2;
+  required uint64 timestampStarted = 3;
+  required uint64 totalOpsProcessed = 4;
 }
 
 message ReplicationLoadSource {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
index eafb54c..6817058 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 public class MetricsSink {
 
   private long lastTimestampForAge = System.currentTimeMillis();
+  private long startTimestamp = System.currentTimeMillis();
   private final MetricsReplicationSinkSource mss;
 
   public MetricsSink() {
@@ -110,4 +111,21 @@ public class MetricsSink {
   public long getTimestampOfLastAppliedOp() {
     return this.lastTimestampForAge;
   }
+
+  /**
+   * Gets the time stamp from when the Sink was initialized.
+   * @return startTimestamp
+   */
+  public long getStartTimestamp() {
+    return this.startTimestamp;
+  }
+
+  /**
+   * Gets the total number of OPs delivered to this sink.
+   * @return totalAplliedOps
+   */
+  public long getAppliedOps() {
+    return this.mss.getSinkAppliedOps();
+  }
+
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
index fe4086b..e011e0a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
@@ -61,6 +61,8 @@ public class ReplicationLoad {
         ClusterStatusProtos.ReplicationLoadSink.newBuilder();
     rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
     rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
+    rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp());
+    rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps());
     this.replicationLoadSink = rLoadSinkBuild.build();
 
     this.replicationLoadSourceEntries = new ArrayList<>();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
index ca6680e..9f704ef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
@@ -50,6 +50,15 @@ public class TestReplicationStatus extends TestReplicationBase {
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestReplicationStatus.class);
 
+  private void insertRowsOnSource() throws IOException {
+    final byte[] qualName = Bytes.toBytes("q");
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+      htable1.put(p);
+    }
+  }
+
   /**
    * Test for HBASE-9531.
    * <p/>
@@ -70,12 +79,7 @@ public class TestReplicationStatus extends TestReplicationBase {
     Admin hbaseAdmin = UTIL1.getAdmin();
     // disable peer <= WHY? I DON'T GET THIS DISABLE BUT TEST FAILS W/O IT.
     hbaseAdmin.disableReplicationPeer(PEER_ID2);
-    final byte[] qualName = Bytes.toBytes("q");
-    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-      Put p = new Put(Bytes.toBytes("row" + i));
-      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
-      htable1.put(p);
-    }
+    insertRowsOnSource();
     LOG.info("AFTER PUTS");
     // TODO: Change this wait to a barrier. I tried waiting on replication stats to
     // change but sleeping in main thread seems to mess up background replication.
@@ -120,6 +124,35 @@ public class TestReplicationStatus extends TestReplicationBase {
     assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID());
   }
 
+  @Test
+  public void testReplicationStatusSink() throws Exception {
+    try (Admin hbaseAdmin = UTIL2.getConnection().getAdmin()) {
+      ServerName server = UTIL2.getHBaseCluster().getRegionServer(0).getServerName();
+      ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server);
+      //First checks if status of timestamp of last applied op is same as RS start, since no edits
+      //were replicated yet
+      assertEquals(loadSink.getTimestampStarted(), loadSink.getTimestampsOfLastAppliedOp());
+      //now insert some rows on source, so that it gets delivered to target
+      insertRowsOnSource();
+      long wait = Waiter.waitFor(UTIL2.getConfiguration(),
+        10000, new Waiter.Predicate<Exception>() {
+          @Override
+          public boolean evaluate() throws Exception {
+            ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server);
+            return loadSink.getTimestampsOfLastAppliedOp()>loadSink.getTimestampStarted();
+          }
+        });
+      //If wait is -1, we know predicate condition was never true
+      assertTrue(wait>=0);
+    }
+  }
+
+  private ReplicationLoadSink getLatestSinkMetric(Admin admin, ServerName server)
+      throws IOException {
+    ClusterMetrics metrics = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
+    ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
+    return sm.getReplicationLoadSink();
+  }
   /**
    * Wait until Master shows metrics counts for ReplicationLoadSourceList that are
    * greater than <code>greaterThan</code> for <code>serverName</code> before
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb
index b970c36..1507c7a 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -812,12 +812,18 @@ module Hbase
           r_source_string = '       SOURCE:'
           r_load_sink = sl.getReplicationLoadSink
           next if r_load_sink.nil?
+          if r_load_sink.getTimestampsOfLastAppliedOp() == r_load_sink.getTimestampStarted()
+          # If we have applied no operations since we've started replication,
+          # assume that we're not acting as a sink and don't print the normal information
+            r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
+            r_sink_string << ", Waiting for OPs... "
+          else
+            r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
+            r_sink_string << ", AgeOfLastAppliedOp=" + r_load_sink.getAgeOfLastAppliedOp().to_s
+            r_sink_string << ", TimeStampsOfLastAppliedOp=" +
+               (java.util.Date.new(r_load_sink.getTimestampsOfLastAppliedOp())).toString()
+          end
 
-          r_sink_string << ' AgeOfLastAppliedOp=' +
-                           r_load_sink.getAgeOfLastAppliedOp.to_s
-          r_sink_string << ', TimeStampsOfLastAppliedOp=' +
-                           java.util.Date.new(r_load_sink
-                             .getTimeStampsOfLastAppliedOp).toString
           r_load_source_map = sl.getReplicationLoadSourceMap
           build_source_string(r_load_source_map, r_source_string)
           puts(format('    %<host>s:', host: server_status.getHostname))
@@ -888,7 +894,7 @@ module Hbase
     end
 
     def build_shipped_stats(source_load, r_source_string)
-      r_source_string << if source_load.getTimeStampOfLastShippedOp.zero?
+      r_source_string << if source_load.getTimestampOfLastShippedOp.zero?
                            "\n           " \
                            'No Ops shipped since last restart'
                          else
@@ -896,7 +902,7 @@ module Hbase
                            source_load.getAgeOfLastShippedOp.to_s +
                            ', TimeStampOfLastShippedOp=' +
                            java.util.Date.new(source_load
-                             .getTimeStampOfLastShippedOp).toString
+                             .getTimestampOfLastShippedOp).toString
                          end
     end