You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2020/06/03 15:11:47 UTC
[hbase] branch branch-2.3 updated: HBASE-21406 "status 'replication'" should not show SINK if the cluste… (#1761)
This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new cf9f752 HBASE-21406 "status 'replication'" should not show SINK if the cluste… (#1761)
cf9f752 is described below
commit cf9f752f9cebbf3f6f5fd81a605ebe09e4c8c26a
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Tue Jun 2 09:27:16 2020 +0100
HBASE-21406 "status 'replication'" should not show SINK if the cluste… (#1761)
Signed-off-by: Jan Hentschel <ja...@ultratendency.com>
Signed-off by: Viraj Jasani <vj...@apache.org>
Signed-off-by: Josh Elser <el...@apache.org>
(Cherry picked from commit e5345b3a7c32c6a80394319c17540b84c8fe66ba)
---
.../hbase/replication/ReplicationLoadSink.java | 15 +++++++-
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 7 +++-
.../regionserver/MetricsReplicationSinkSource.java | 1 +
.../MetricsReplicationSinkSourceImpl.java | 4 ++
.../src/main/protobuf/ClusterStatus.proto | 2 +
.../replication/regionserver/MetricsSink.java | 18 +++++++++
.../replication/regionserver/ReplicationLoad.java | 2 +
.../hbase/replication/TestReplicationStatus.java | 45 +++++++++++++++++++---
hbase-shell/src/main/ruby/hbase/admin.rb | 20 ++++++----
9 files changed, 99 insertions(+), 15 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java
index d9698ca..f9afddd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java
@@ -19,12 +19,17 @@ import org.apache.yetus.audience.InterfaceAudience;
public class ReplicationLoadSink {
private final long ageOfLastAppliedOp;
private final long timestampsOfLastAppliedOp;
+ private final long timestampStarted;
+ private final long totalOpsProcessed;
// TODO: add the builder for this class
@InterfaceAudience.Private
- public ReplicationLoadSink(long age, long timestamp) {
+ public ReplicationLoadSink(long age, long timestamp, long timestampStarted,
+ long totalOpsProcessed) {
this.ageOfLastAppliedOp = age;
this.timestampsOfLastAppliedOp = timestamp;
+ this.timestampStarted = timestampStarted;
+ this.totalOpsProcessed = totalOpsProcessed;
}
public long getAgeOfLastAppliedOp() {
@@ -43,4 +48,12 @@ public class ReplicationLoadSink {
public long getTimestampsOfLastAppliedOp() {
return this.timestampsOfLastAppliedOp;
}
+
+ public long getTimestampStarted() {
+ return timestampStarted;
+ }
+
+ public long getTotalOpsProcessed() {
+ return totalOpsProcessed;
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 1c3a217..563ec95 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -2801,7 +2801,10 @@ public final class ProtobufUtil {
public static ReplicationLoadSink toReplicationLoadSink(
ClusterStatusProtos.ReplicationLoadSink rls) {
- return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(), rls.getTimeStampsOfLastAppliedOp());
+ return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(),
+ rls.getTimeStampsOfLastAppliedOp(),
+ rls.getTimestampStarted(),
+ rls.getTotalOpsProcessed());
}
public static ReplicationLoadSource toReplicationLoadSource(
@@ -3394,6 +3397,8 @@ public final class ProtobufUtil {
return ClusterStatusProtos.ReplicationLoadSink.newBuilder()
.setAgeOfLastAppliedOp(rls.getAgeOfLastAppliedOp())
.setTimeStampsOfLastAppliedOp(rls.getTimestampsOfLastAppliedOp())
+ .setTimestampStarted(rls.getTimestampStarted())
+ .setTotalOpsProcessed(rls.getTotalOpsProcessed())
.build();
}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
index 1d6251b..2498e34 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
@@ -32,4 +32,5 @@ public interface MetricsReplicationSinkSource {
void incrAppliedOps(long batchsize);
long getLastAppliedOpAge();
void incrAppliedHFiles(long hfileSize);
+ long getSinkAppliedOps();
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
index 485764e..18addb0 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
@@ -58,4 +58,8 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
public void incrAppliedHFiles(long hfiles) {
hfilesCounter.incr(hfiles);
}
+
+ @Override public long getSinkAppliedOps() {
+ return opsCounter.value();
+ }
}
diff --git a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
index 65c2268..d708674 100644
--- a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
@@ -185,6 +185,8 @@ message ClientMetrics {
message ReplicationLoadSink {
required uint64 ageOfLastAppliedOp = 1;
required uint64 timeStampsOfLastAppliedOp = 2;
+ required uint64 timestampStarted = 3;
+ required uint64 totalOpsProcessed = 4;
}
message ReplicationLoadSource {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
index eafb54c..6817058 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
public class MetricsSink {
private long lastTimestampForAge = System.currentTimeMillis();
+ private long startTimestamp = System.currentTimeMillis();
private final MetricsReplicationSinkSource mss;
public MetricsSink() {
@@ -110,4 +111,21 @@ public class MetricsSink {
public long getTimestampOfLastAppliedOp() {
return this.lastTimestampForAge;
}
+
+ /**
+ * Gets the time stamp from when the Sink was initialized.
+ * @return startTimestamp
+ */
+ public long getStartTimestamp() {
+ return this.startTimestamp;
+ }
+
+ /**
+ * Gets the total number of OPs delivered to this sink.
+ * @return totalAplliedOps
+ */
+ public long getAppliedOps() {
+ return this.mss.getSinkAppliedOps();
+ }
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
index fe4086b..e011e0a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
@@ -61,6 +61,8 @@ public class ReplicationLoad {
ClusterStatusProtos.ReplicationLoadSink.newBuilder();
rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
+ rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp());
+ rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps());
this.replicationLoadSink = rLoadSinkBuild.build();
this.replicationLoadSourceEntries = new ArrayList<>();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
index ca6680e..9f704ef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
@@ -50,6 +50,15 @@ public class TestReplicationStatus extends TestReplicationBase {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationStatus.class);
+ private void insertRowsOnSource() throws IOException {
+ final byte[] qualName = Bytes.toBytes("q");
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+ htable1.put(p);
+ }
+ }
+
/**
* Test for HBASE-9531.
* <p/>
@@ -70,12 +79,7 @@ public class TestReplicationStatus extends TestReplicationBase {
Admin hbaseAdmin = UTIL1.getAdmin();
// disable peer <= WHY? I DON'T GET THIS DISABLE BUT TEST FAILS W/O IT.
hbaseAdmin.disableReplicationPeer(PEER_ID2);
- final byte[] qualName = Bytes.toBytes("q");
- for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
- Put p = new Put(Bytes.toBytes("row" + i));
- p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
- htable1.put(p);
- }
+ insertRowsOnSource();
LOG.info("AFTER PUTS");
// TODO: Change this wait to a barrier. I tried waiting on replication stats to
// change but sleeping in main thread seems to mess up background replication.
@@ -120,6 +124,35 @@ public class TestReplicationStatus extends TestReplicationBase {
assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID());
}
+ @Test
+ public void testReplicationStatusSink() throws Exception {
+ try (Admin hbaseAdmin = UTIL2.getConnection().getAdmin()) {
+ ServerName server = UTIL2.getHBaseCluster().getRegionServer(0).getServerName();
+ ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server);
+ //First checks if status of timestamp of last applied op is same as RS start, since no edits
+ //were replicated yet
+ assertEquals(loadSink.getTimestampStarted(), loadSink.getTimestampsOfLastAppliedOp());
+ //now insert some rows on source, so that it gets delivered to target
+ insertRowsOnSource();
+ long wait = Waiter.waitFor(UTIL2.getConfiguration(),
+ 10000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server);
+ return loadSink.getTimestampsOfLastAppliedOp()>loadSink.getTimestampStarted();
+ }
+ });
+ //If wait is -1, we know predicate condition was never true
+ assertTrue(wait>=0);
+ }
+ }
+
+ private ReplicationLoadSink getLatestSinkMetric(Admin admin, ServerName server)
+ throws IOException {
+ ClusterMetrics metrics = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
+ ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
+ return sm.getReplicationLoadSink();
+ }
/**
* Wait until Master shows metrics counts for ReplicationLoadSourceList that are
* greater than <code>greaterThan</code> for <code>serverName</code> before
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb
index b970c36..1507c7a 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -812,12 +812,18 @@ module Hbase
r_source_string = ' SOURCE:'
r_load_sink = sl.getReplicationLoadSink
next if r_load_sink.nil?
+ if r_load_sink.getTimestampsOfLastAppliedOp() == r_load_sink.getTimestampStarted()
+ # If we have applied no operations since we've started replication,
+ # assume that we're not acting as a sink and don't print the normal information
+ r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
+ r_sink_string << ", Waiting for OPs... "
+ else
+ r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
+ r_sink_string << ", AgeOfLastAppliedOp=" + r_load_sink.getAgeOfLastAppliedOp().to_s
+ r_sink_string << ", TimeStampsOfLastAppliedOp=" +
+ (java.util.Date.new(r_load_sink.getTimestampsOfLastAppliedOp())).toString()
+ end
- r_sink_string << ' AgeOfLastAppliedOp=' +
- r_load_sink.getAgeOfLastAppliedOp.to_s
- r_sink_string << ', TimeStampsOfLastAppliedOp=' +
- java.util.Date.new(r_load_sink
- .getTimeStampsOfLastAppliedOp).toString
r_load_source_map = sl.getReplicationLoadSourceMap
build_source_string(r_load_source_map, r_source_string)
puts(format(' %<host>s:', host: server_status.getHostname))
@@ -888,7 +894,7 @@ module Hbase
end
def build_shipped_stats(source_load, r_source_string)
- r_source_string << if source_load.getTimeStampOfLastShippedOp.zero?
+ r_source_string << if source_load.getTimestampOfLastShippedOp.zero?
"\n " \
'No Ops shipped since last restart'
else
@@ -896,7 +902,7 @@ module Hbase
source_load.getAgeOfLastShippedOp.to_s +
', TimeStampOfLastShippedOp=' +
java.util.Date.new(source_load
- .getTimeStampOfLastShippedOp).toString
+ .getTimestampOfLastShippedOp).toString
end
end