You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2018/10/02 01:51:22 UTC
[1/5] hbase git commit: HBASE-18549 Add metrics for failed
replication queue recovery
Repository: hbase
Updated Branches:
refs/heads/branch-1 113554ded -> 6e0ee4efa
refs/heads/branch-1.4 964a1d614 -> b22095eea
refs/heads/branch-2 79ec1a1fd -> 8a5537b5f
refs/heads/branch-2.1 f9d7ac2d5 -> 3df8b6f7b
refs/heads/master 79fe878a3 -> 42aa3dd46
HBASE-18549 Add metrics for failed replication queue recovery
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/42aa3dd4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/42aa3dd4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/42aa3dd4
Branch: refs/heads/master
Commit: 42aa3dd463c0d30a9b940d296b87316b5c67e1f5
Parents: 79fe878
Author: Xu Cang <xc...@salesforce.com>
Authored: Wed Aug 29 15:49:51 2018 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Oct 1 18:38:55 2018 -0700
----------------------------------------------------------------------
.../regionserver/MetricsReplicationSourceSource.java | 2 ++
.../MetricsReplicationGlobalSourceSource.java | 8 +++++++-
.../MetricsReplicationSourceSourceImpl.java | 3 +++
.../hbase/replication/ReplicationQueueStorage.java | 7 +++++++
.../hbase/replication/ZKReplicationQueueStorage.java | 3 ++-
.../hbase/replication/regionserver/MetricsSource.java | 4 ++++
.../regionserver/ReplicationSourceManager.java | 10 +++++++++-
.../hbase/replication/TestReplicationEndpoint.java | 14 +++++++++++++-
8 files changed, 47 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/42aa3dd4/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 1045113..61e9431 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -53,6 +53,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_REPEATED_LOG_FILE_BYTES = "source.repeatedLogFileBytes";
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
+ public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
void setLastShippedAge(long age);
void incrSizeOfLogQueue(int size);
@@ -76,4 +77,5 @@ public interface MetricsReplicationSourceSource extends BaseSource {
void incrRepeatedFileBytes(final long bytes);
void incrCompletedWAL();
void incrCompletedRecoveryQueue();
+ void incrFailedRecoveryQueue();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/42aa3dd4/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 9a86cf2..4e8c810 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -52,6 +52,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
private final MutableFastCounter repeatedFileBytes;
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
+ private final MutableFastCounter failedRecoveryQueue;
public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
this.rms = rms;
@@ -89,6 +90,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L);
completedRecoveryQueue = rms.getMetricsRegistry()
.getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
+ failedRecoveryQueue = rms.getMetricsRegistry()
+ .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
}
@Override public void setLastShippedAge(long age) {
@@ -199,7 +202,10 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public void incrCompletedRecoveryQueue() {
completedRecoveryQueue.incr(1L);
}
-
+ @Override
+ public void incrFailedRecoveryQueue() {
+ failedRecoveryQueue.incr(1L);
+ }
@Override
public void init() {
rms.init();
http://git-wip-us.apache.org/repos/asf/hbase/blob/42aa3dd4/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 719c916..0ad5052 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -258,6 +258,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
}
@Override
+ public void incrFailedRecoveryQueue() {/*no op*/}
+
+ @Override
public void init() {
rms.init();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/42aa3dd4/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index 84653ad..59278e9 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -202,4 +202,11 @@ public interface ReplicationQueueStorage {
* created hfile references during the call may not be included.
*/
Set<String> getAllHFileRefs() throws ReplicationException;
+
+ /**
+ * Get full znode name for given region server
+ * @param serverName the name of the region server
+ * @return full znode name
+ */
+ String getRsNode(ServerName serverName);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/42aa3dd4/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index cca8bfc..68f2adc 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -118,7 +118,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
.get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
}
- private String getRsNode(ServerName serverName) {
+ @Override
+ public String getRsNode(ServerName serverName) {
return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/42aa3dd4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 906f0c6..830ebe1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -326,6 +326,10 @@ public class MetricsSource implements BaseSource {
globalSourceSource.incrCompletedRecoveryQueue();
}
+ public void incrFailedRecoveryQueue() {
+ globalSourceSource.incrFailedRecoveryQueue();
+ }
+
@Override
public void init() {
singleSourceSource.init();
http://git-wip-us.apache.org/repos/asf/hbase/blob/42aa3dd4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 428ec98..5756cbc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -45,6 +45,7 @@ import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
@@ -820,6 +821,8 @@ public class ReplicationSourceManager implements ReplicationListener {
try {
this.executor.execute(transfer);
} catch (RejectedExecutionException ex) {
+ CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
+ .getGlobalSource().incrFailedRecoveryQueue();
LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage());
}
}
@@ -891,7 +894,12 @@ public class ReplicationSourceManager implements ReplicationListener {
queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
}
} catch (ReplicationException e) {
- server.abort("Failed to claim queue from dead regionserver", e);
+ LOG.error(String.format("ReplicationException: cannot claim dead region (%s)'s " +
+ "replication queue. Znode : (%s)" +
+ " Possible solution: check if znode size exceeds jute.maxBuffer value. " +
+ " If so, increase it for both client and server side." + e), deadRS,
+ queueStorage.getRsNode(deadRS));
+ server.abort("Failed to claim queue from dead regionserver.", e);
return;
}
// Copying over the failed queue is completed.
http://git-wip-us.apache.org/repos/asf/hbase/blob/42aa3dd4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 5d833cc..03fbb59 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.replication;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -316,11 +318,17 @@ public class TestReplicationEndpoint extends TestReplicationBase {
MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
+
MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
+ MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
+ doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
+
Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
- MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource,
+ MetricsSource source = new MetricsSource(id, singleSourceSource, spyglobalSourceSource,
singleSourceSourceByTable);
+
+
String gaugeName = "gauge";
String singleGaugeName = "source.id." + gaugeName;
String globalGaugeName = "source." + gaugeName;
@@ -340,6 +348,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
source.removeMetric(gaugeName);
source.setGauge(gaugeName, delta);
source.updateHistogram(counterName, count);
+ source.incrFailedRecoveryQueue();
+
verify(singleRms).decGauge(singleGaugeName, delta);
verify(globalRms).decGauge(globalGaugeName, delta);
@@ -357,6 +367,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
verify(globalRms).setGauge(globalGaugeName, delta);
verify(singleRms).updateHistogram(singleCounterName, count);
verify(globalRms).updateHistogram(globalCounterName, count);
+ verify(spyglobalSourceSource).incrFailedRecoveryQueue();
//check singleSourceSourceByTable metrics.
// singleSourceSourceByTable map entry will be created only
@@ -373,6 +384,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
// cannot put more concreate value here to verify because the age is arbitrary.
// as long as it's greater than 0, we see it as correct answer.
Assert.assertTrue(msr.getLastShippedAge() > 0);
+
}
private void doPut(byte[] row) throws IOException {
[2/5] hbase git commit: HBASE-18549 Add metrics for failed
replication queue recovery
Posted by ap...@apache.org.
HBASE-18549 Add metrics for failed replication queue recovery
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8a5537b5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8a5537b5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8a5537b5
Branch: refs/heads/branch-2
Commit: 8a5537b5f5a8eed1fd94830672b7df299a5e4d4f
Parents: 79ec1a1
Author: Xu Cang <xc...@salesforce.com>
Authored: Wed Aug 29 15:49:51 2018 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Oct 1 18:39:03 2018 -0700
----------------------------------------------------------------------
.../regionserver/MetricsReplicationSourceSource.java | 2 ++
.../MetricsReplicationGlobalSourceSource.java | 8 +++++++-
.../MetricsReplicationSourceSourceImpl.java | 3 +++
.../hbase/replication/ReplicationQueueStorage.java | 7 +++++++
.../hbase/replication/ZKReplicationQueueStorage.java | 3 ++-
.../hbase/replication/regionserver/MetricsSource.java | 4 ++++
.../regionserver/ReplicationSourceManager.java | 10 +++++++++-
.../hbase/replication/TestReplicationEndpoint.java | 14 +++++++++++++-
8 files changed, 47 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8a5537b5/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index f56139b..1a17f37 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -51,6 +51,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_REPEATED_LOG_FILE_BYTES = "source.repeatedLogFileBytes";
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
+ public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
void setLastShippedAge(long age);
void incrSizeOfLogQueue(int size);
@@ -74,4 +75,5 @@ public interface MetricsReplicationSourceSource extends BaseSource {
void incrRepeatedFileBytes(final long bytes);
void incrCompletedWAL();
void incrCompletedRecoveryQueue();
+ void incrFailedRecoveryQueue();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8a5537b5/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 9a86cf2..4e8c810 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -52,6 +52,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
private final MutableFastCounter repeatedFileBytes;
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
+ private final MutableFastCounter failedRecoveryQueue;
public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
this.rms = rms;
@@ -89,6 +90,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L);
completedRecoveryQueue = rms.getMetricsRegistry()
.getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
+ failedRecoveryQueue = rms.getMetricsRegistry()
+ .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
}
@Override public void setLastShippedAge(long age) {
@@ -199,7 +202,10 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public void incrCompletedRecoveryQueue() {
completedRecoveryQueue.incr(1L);
}
-
+ @Override
+ public void incrFailedRecoveryQueue() {
+ failedRecoveryQueue.incr(1L);
+ }
@Override
public void init() {
rms.init();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8a5537b5/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 719c916..0ad5052 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -258,6 +258,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
}
@Override
+ public void incrFailedRecoveryQueue() {/*no op*/}
+
+ @Override
public void init() {
rms.init();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8a5537b5/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index 84653ad..59278e9 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -202,4 +202,11 @@ public interface ReplicationQueueStorage {
* created hfile references during the call may not be included.
*/
Set<String> getAllHFileRefs() throws ReplicationException;
+
+ /**
+ * Get full znode name for given region server
+ * @param serverName the name of the region server
+ * @return full znode name
+ */
+ String getRsNode(ServerName serverName);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8a5537b5/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index cca8bfc..68f2adc 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -118,7 +118,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
.get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
}
- private String getRsNode(ServerName serverName) {
+ @Override
+ public String getRsNode(ServerName serverName) {
return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8a5537b5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 906f0c6..830ebe1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -326,6 +326,10 @@ public class MetricsSource implements BaseSource {
globalSourceSource.incrCompletedRecoveryQueue();
}
+ public void incrFailedRecoveryQueue() {
+ globalSourceSource.incrFailedRecoveryQueue();
+ }
+
@Override
public void init() {
singleSourceSource.init();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8a5537b5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a370867..5d4f034 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -43,6 +43,7 @@ import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
@@ -635,6 +636,8 @@ public class ReplicationSourceManager implements ReplicationListener {
try {
this.executor.execute(transfer);
} catch (RejectedExecutionException ex) {
+ CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
+ .getGlobalSource().incrFailedRecoveryQueue();
LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage());
}
}
@@ -706,7 +709,12 @@ public class ReplicationSourceManager implements ReplicationListener {
queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
}
} catch (ReplicationException e) {
- server.abort("Failed to claim queue from dead regionserver", e);
+ LOG.error(String.format("ReplicationException: cannot claim dead region (%s)'s " +
+ "replication queue. Znode : (%s)" +
+ " Possible solution: check if znode size exceeds jute.maxBuffer value. " +
+ " If so, increase it for both client and server side." + e), deadRS,
+ queueStorage.getRsNode(deadRS));
+ server.abort("Failed to claim queue from dead regionserver.", e);
return;
}
// Copying over the failed queue is completed.
http://git-wip-us.apache.org/repos/asf/hbase/blob/8a5537b5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 5d833cc..03fbb59 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.replication;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -316,11 +318,17 @@ public class TestReplicationEndpoint extends TestReplicationBase {
MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
+
MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
+ MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
+ doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
+
Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
- MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource,
+ MetricsSource source = new MetricsSource(id, singleSourceSource, spyglobalSourceSource,
singleSourceSourceByTable);
+
+
String gaugeName = "gauge";
String singleGaugeName = "source.id." + gaugeName;
String globalGaugeName = "source." + gaugeName;
@@ -340,6 +348,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
source.removeMetric(gaugeName);
source.setGauge(gaugeName, delta);
source.updateHistogram(counterName, count);
+ source.incrFailedRecoveryQueue();
+
verify(singleRms).decGauge(singleGaugeName, delta);
verify(globalRms).decGauge(globalGaugeName, delta);
@@ -357,6 +367,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
verify(globalRms).setGauge(globalGaugeName, delta);
verify(singleRms).updateHistogram(singleCounterName, count);
verify(globalRms).updateHistogram(globalCounterName, count);
+ verify(spyglobalSourceSource).incrFailedRecoveryQueue();
//check singleSourceSourceByTable metrics.
// singleSourceSourceByTable map entry will be created only
@@ -373,6 +384,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
// cannot put more concreate value here to verify because the age is arbitrary.
// as long as it's greater than 0, we see it as correct answer.
Assert.assertTrue(msr.getLastShippedAge() > 0);
+
}
private void doPut(byte[] row) throws IOException {
[4/5] hbase git commit: HBASE-18549 Add metrics for failed
replication queue recovery
Posted by ap...@apache.org.
HBASE-18549 Add metrics for failed replication queue recovery
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6e0ee4ef
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6e0ee4ef
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6e0ee4ef
Branch: refs/heads/branch-1
Commit: 6e0ee4efa7b01dc4b3e4a60698a4863332bbab57
Parents: 113554d
Author: Xu Cang <xc...@salesforce.com>
Authored: Wed Aug 29 16:39:57 2018 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Oct 1 18:39:24 2018 -0700
----------------------------------------------------------------------
.../hadoop/hbase/replication/ReplicationQueuesZKImpl.java | 5 ++++-
.../regionserver/MetricsReplicationSourceSource.java | 3 +++
.../regionserver/MetricsReplicationGlobalSourceSource.java | 8 +++++++-
.../regionserver/MetricsReplicationSourceSourceImpl.java | 3 +++
.../hadoop/hbase/replication/regionserver/MetricsSource.java | 5 +++++
.../replication/regionserver/ReplicationSourceManager.java | 3 +++
.../hadoop/hbase/replication/TestReplicationEndpoint.java | 8 +++++++-
7 files changed, 32 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6e0ee4ef/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index dda9adf..f88ae53 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -499,7 +499,10 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
}
return new Pair<>(newCluster, logQueue);
} catch (KeeperException e) {
- LOG.warn("Got exception in copyQueueFromLockedRS: ", e);
+ LOG.warn("Got exception in copyQueueFromLockedRS: "+
+ " Possible problem: check if znode size exceeds jute.maxBuffer value. "
+ + "If so, increase it for both client and server side." ,e);
+
} catch (InterruptedException e) {
LOG.warn(e);
Thread.currentThread().interrupt();
http://git-wip-us.apache.org/repos/asf/hbase/blob/6e0ee4ef/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 9075a68..25d72af 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -49,6 +49,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_REPEATED_LOG_FILE_BYTES = "source.repeatedLogFileBytes";
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
+ public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
void setLastShippedAge(long age);
void incrSizeOfLogQueue(int size);
@@ -72,4 +73,6 @@ public interface MetricsReplicationSourceSource extends BaseSource {
void incrRepeatedFileBytes(final long bytes);
void incrCompletedWAL();
void incrCompletedRecoveryQueue();
+ void incrFailedRecoveryQueue();
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6e0ee4ef/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 0e5c07f..64585fa 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -45,6 +45,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
private final MutableFastCounter repeatedFileBytes;
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
+ private final MutableFastCounter failedRecoveryQueue;
public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
this.rms = rms;
@@ -79,6 +80,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L);
completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L);
completedRecoveryQueue = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
+ failedRecoveryQueue = rms.getMetricsRegistry()
+ .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
}
@Override public void setLastShippedAge(long age) {
@@ -189,7 +192,10 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public void incrCompletedRecoveryQueue() {
completedRecoveryQueue.incr(1L);
}
-
+ @Override
+ public void incrFailedRecoveryQueue() {
+ failedRecoveryQueue.incr(1L);
+ }
@Override
public void init() {
rms.init();
http://git-wip-us.apache.org/repos/asf/hbase/blob/6e0ee4ef/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 9838e42..0078a97 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -254,6 +254,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
}
@Override
+ public void incrFailedRecoveryQueue() {/*no op*/}
+
+ @Override
public void init() {
rms.init();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6e0ee4ef/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index d122556..53e1074 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -296,6 +296,11 @@ public class MetricsSource implements BaseSource {
globalSourceSource.incrCompletedRecoveryQueue();
}
+ public void incrFailedRecoveryQueue() {
+ globalSourceSource.incrFailedRecoveryQueue();
+ }
+
+
@Override
public void init() {
singleSourceSource.init();
http://git-wip-us.apache.org/repos/asf/hbase/blob/6e0ee4ef/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 63bba8d..dbe9e63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -48,6 +48,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
@@ -544,6 +545,8 @@ public class ReplicationSourceManager implements ReplicationListener {
try {
this.executor.execute(transfer);
} catch (RejectedExecutionException ex) {
+ CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
+ .getGlobalSource().incrFailedRecoveryQueue();
LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6e0ee4ef/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 3b984ab..9914a20 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -54,7 +54,9 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -307,8 +309,10 @@ public class TestReplicationEndpoint extends TestReplicationBase {
MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
+ MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
+ doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
- MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource,
+ MetricsSource source = new MetricsSource(id, singleSourceSource, spyglobalSourceSource,
singleSourceSourceByTable);
String gaugeName = "gauge";
String singleGaugeName = "source.id." + gaugeName;
@@ -327,6 +331,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
source.removeMetric(gaugeName);
source.setGauge(gaugeName, delta);
source.updateHistogram(counterName, count);
+ source.incrFailedRecoveryQueue();
verify(singleRms).decGauge(singleGaugeName, delta);
verify(globalRms).decGauge(gaugeName, delta);
@@ -344,6 +349,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
verify(globalRms).setGauge(gaugeName, delta);
verify(singleRms).updateHistogram(singleCounterName, count);
verify(globalRms).updateHistogram(counterName, count);
+ verify(spyglobalSourceSource).incrFailedRecoveryQueue();
// check singleSourceSourceByTable metrics.
[3/5] hbase git commit: HBASE-18549 Add metrics for failed
replication queue recovery
Posted by ap...@apache.org.
HBASE-18549 Add metrics for failed replication queue recovery
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3df8b6f7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3df8b6f7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3df8b6f7
Branch: refs/heads/branch-2.1
Commit: 3df8b6f7bb3da34e2d8806916c82862b2441f59e
Parents: f9d7ac2
Author: Xu Cang <xc...@salesforce.com>
Authored: Wed Aug 29 15:49:51 2018 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Oct 1 18:39:07 2018 -0700
----------------------------------------------------------------------
.../regionserver/MetricsReplicationSourceSource.java | 2 ++
.../MetricsReplicationGlobalSourceSource.java | 8 +++++++-
.../MetricsReplicationSourceSourceImpl.java | 3 +++
.../hbase/replication/ReplicationQueueStorage.java | 7 +++++++
.../hbase/replication/ZKReplicationQueueStorage.java | 3 ++-
.../hbase/replication/regionserver/MetricsSource.java | 4 ++++
.../regionserver/ReplicationSourceManager.java | 10 +++++++++-
.../hbase/replication/TestReplicationEndpoint.java | 14 +++++++++++++-
8 files changed, 47 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/3df8b6f7/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index f56139b..1a17f37 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -51,6 +51,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_REPEATED_LOG_FILE_BYTES = "source.repeatedLogFileBytes";
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
+ public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
void setLastShippedAge(long age);
void incrSizeOfLogQueue(int size);
@@ -74,4 +75,5 @@ public interface MetricsReplicationSourceSource extends BaseSource {
void incrRepeatedFileBytes(final long bytes);
void incrCompletedWAL();
void incrCompletedRecoveryQueue();
+ void incrFailedRecoveryQueue();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3df8b6f7/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 9a86cf2..4e8c810 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -52,6 +52,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
private final MutableFastCounter repeatedFileBytes;
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
+ private final MutableFastCounter failedRecoveryQueue;
public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
this.rms = rms;
@@ -89,6 +90,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L);
completedRecoveryQueue = rms.getMetricsRegistry()
.getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
+ failedRecoveryQueue = rms.getMetricsRegistry()
+ .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
}
@Override public void setLastShippedAge(long age) {
@@ -199,7 +202,10 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public void incrCompletedRecoveryQueue() {
completedRecoveryQueue.incr(1L);
}
-
+ @Override
+ public void incrFailedRecoveryQueue() {
+ failedRecoveryQueue.incr(1L);
+ }
@Override
public void init() {
rms.init();
http://git-wip-us.apache.org/repos/asf/hbase/blob/3df8b6f7/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 719c916..0ad5052 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -258,6 +258,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
}
@Override
+ public void incrFailedRecoveryQueue() {/*no op*/}
+
+ @Override
public void init() {
rms.init();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3df8b6f7/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index 84653ad..59278e9 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -202,4 +202,11 @@ public interface ReplicationQueueStorage {
* created hfile references during the call may not be included.
*/
Set<String> getAllHFileRefs() throws ReplicationException;
+
+ /**
+ * Get full znode name for given region server
+ * @param serverName the name of the region server
+ * @return full znode name
+ */
+ String getRsNode(ServerName serverName);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3df8b6f7/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index cca8bfc..68f2adc 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -118,7 +118,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
.get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
}
- private String getRsNode(ServerName serverName) {
+ @Override
+ public String getRsNode(ServerName serverName) {
return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3df8b6f7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 906f0c6..830ebe1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -326,6 +326,10 @@ public class MetricsSource implements BaseSource {
globalSourceSource.incrCompletedRecoveryQueue();
}
+ public void incrFailedRecoveryQueue() {
+ globalSourceSource.incrFailedRecoveryQueue();
+ }
+
@Override
public void init() {
singleSourceSource.init();
http://git-wip-us.apache.org/repos/asf/hbase/blob/3df8b6f7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a370867..5d4f034 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -43,6 +43,7 @@ import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
@@ -635,6 +636,8 @@ public class ReplicationSourceManager implements ReplicationListener {
try {
this.executor.execute(transfer);
} catch (RejectedExecutionException ex) {
+ CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
+ .getGlobalSource().incrFailedRecoveryQueue();
LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage());
}
}
@@ -706,7 +709,12 @@ public class ReplicationSourceManager implements ReplicationListener {
queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
}
} catch (ReplicationException e) {
- server.abort("Failed to claim queue from dead regionserver", e);
+ LOG.error(String.format("ReplicationException: cannot claim dead region (%s)'s " +
+ "replication queue. Znode : (%s)" +
+ " Possible solution: check if znode size exceeds jute.maxBuffer value. " +
+ " If so, increase it for both client and server side." + e), deadRS,
+ queueStorage.getRsNode(deadRS));
+ server.abort("Failed to claim queue from dead regionserver.", e);
return;
}
// Copying over the failed queue is completed.
http://git-wip-us.apache.org/repos/asf/hbase/blob/3df8b6f7/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 5d833cc..03fbb59 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.replication;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -316,11 +318,17 @@ public class TestReplicationEndpoint extends TestReplicationBase {
MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
+
MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
+ MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
+ doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
+
Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
- MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource,
+ MetricsSource source = new MetricsSource(id, singleSourceSource, spyglobalSourceSource,
singleSourceSourceByTable);
+
+
String gaugeName = "gauge";
String singleGaugeName = "source.id." + gaugeName;
String globalGaugeName = "source." + gaugeName;
@@ -340,6 +348,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
source.removeMetric(gaugeName);
source.setGauge(gaugeName, delta);
source.updateHistogram(counterName, count);
+ source.incrFailedRecoveryQueue();
+
verify(singleRms).decGauge(singleGaugeName, delta);
verify(globalRms).decGauge(globalGaugeName, delta);
@@ -357,6 +367,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
verify(globalRms).setGauge(globalGaugeName, delta);
verify(singleRms).updateHistogram(singleCounterName, count);
verify(globalRms).updateHistogram(globalCounterName, count);
+ verify(spyglobalSourceSource).incrFailedRecoveryQueue();
//check singleSourceSourceByTable metrics.
// singleSourceSourceByTable map entry will be created only
@@ -373,6 +384,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
// cannot put more concreate value here to verify because the age is arbitrary.
// as long as it's greater than 0, we see it as correct answer.
Assert.assertTrue(msr.getLastShippedAge() > 0);
+
}
private void doPut(byte[] row) throws IOException {
[5/5] hbase git commit: HBASE-18549 Add metrics for failed
replication queue recovery
Posted by ap...@apache.org.
HBASE-18549 Add metrics for failed replication queue recovery
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b22095ee
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b22095ee
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b22095ee
Branch: refs/heads/branch-1.4
Commit: b22095eea1590d6a59ca1430ffaaf1c1bff3b0c0
Parents: 964a1d6
Author: Xu Cang <xc...@salesforce.com>
Authored: Wed Aug 29 16:39:57 2018 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Oct 1 18:43:01 2018 -0700
----------------------------------------------------------------------
.../hadoop/hbase/replication/ReplicationQueuesZKImpl.java | 5 ++++-
.../regionserver/MetricsReplicationSourceSource.java | 3 +++
.../regionserver/MetricsReplicationGlobalSourceSource.java | 8 +++++++-
.../regionserver/MetricsReplicationSourceSourceImpl.java | 3 +++
.../hbase/replication/regionserver/MetricsSource.java | 5 +++++
.../replication/regionserver/ReplicationSourceManager.java | 3 +++
.../hadoop/hbase/replication/TestReplicationEndpoint.java | 9 +++++++--
7 files changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/b22095ee/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index dda9adf..f88ae53 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -499,7 +499,10 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
}
return new Pair<>(newCluster, logQueue);
} catch (KeeperException e) {
- LOG.warn("Got exception in copyQueueFromLockedRS: ", e);
+ LOG.warn("Got exception in copyQueueFromLockedRS: "+
+ " Possible problem: check if znode size exceeds jute.maxBuffer value. "
+ + "If so, increase it for both client and server side." ,e);
+
} catch (InterruptedException e) {
LOG.warn(e);
Thread.currentThread().interrupt();
http://git-wip-us.apache.org/repos/asf/hbase/blob/b22095ee/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 9075a68..25d72af 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -49,6 +49,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_REPEATED_LOG_FILE_BYTES = "source.repeatedLogFileBytes";
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
+ public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
void setLastShippedAge(long age);
void incrSizeOfLogQueue(int size);
@@ -72,4 +73,6 @@ public interface MetricsReplicationSourceSource extends BaseSource {
void incrRepeatedFileBytes(final long bytes);
void incrCompletedWAL();
void incrCompletedRecoveryQueue();
+ void incrFailedRecoveryQueue();
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b22095ee/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 0e5c07f..64585fa 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -45,6 +45,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
private final MutableFastCounter repeatedFileBytes;
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
+ private final MutableFastCounter failedRecoveryQueue;
public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
this.rms = rms;
@@ -79,6 +80,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L);
completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L);
completedRecoveryQueue = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
+ failedRecoveryQueue = rms.getMetricsRegistry()
+ .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
}
@Override public void setLastShippedAge(long age) {
@@ -189,7 +192,10 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public void incrCompletedRecoveryQueue() {
completedRecoveryQueue.incr(1L);
}
-
+ @Override
+ public void incrFailedRecoveryQueue() {
+ failedRecoveryQueue.incr(1L);
+ }
@Override
public void init() {
rms.init();
http://git-wip-us.apache.org/repos/asf/hbase/blob/b22095ee/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 9838e42..0078a97 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -254,6 +254,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
}
@Override
+ public void incrFailedRecoveryQueue() {/*no op*/}
+
+ @Override
public void init() {
rms.init();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b22095ee/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 2d99018..c08b187 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -278,6 +278,11 @@ public class MetricsSource implements BaseSource {
globalSourceSource.incrCompletedRecoveryQueue();
}
+ public void incrFailedRecoveryQueue() {
+ globalSourceSource.incrFailedRecoveryQueue();
+ }
+
+
@Override
public void init() {
singleSourceSource.init();
http://git-wip-us.apache.org/repos/asf/hbase/blob/b22095ee/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 5686fa6..d5b5c63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -47,6 +47,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
@@ -536,6 +537,8 @@ public class ReplicationSourceManager implements ReplicationListener {
try {
this.executor.execute(transfer);
} catch (RejectedExecutionException ex) {
+ CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
+ .getGlobalSource().incrFailedRecoveryQueue();
LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b22095ee/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 277d876..f2a5d58 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -53,7 +52,9 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -306,7 +307,9 @@ public class TestReplicationEndpoint extends TestReplicationBase {
MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
- MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource);
+ MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
+ doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
+ MetricsSource source = new MetricsSource(id, singleSourceSource, spyglobalSourceSource);
String gaugeName = "gauge";
String singleGaugeName = "source.id." + gaugeName;
long delta = 1;
@@ -324,6 +327,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
source.removeMetric(gaugeName);
source.setGauge(gaugeName, delta);
source.updateHistogram(counterName, count);
+ source.incrFailedRecoveryQueue();
verify(singleRms).decGauge(singleGaugeName, delta);
verify(globalRms).decGauge(gaugeName, delta);
@@ -341,6 +345,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
verify(globalRms).setGauge(gaugeName, delta);
verify(singleRms).updateHistogram(singleCounterName, count);
verify(globalRms).updateHistogram(counterName, count);
+ verify(spyglobalSourceSource).incrFailedRecoveryQueue();
}
private void doPut(byte[] row) throws IOException {