You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mi...@apache.org on 2020/10/19 06:03:50 UTC
[geode] branch develop updated: GEODE-8497: added
getTotalQueueSizeBytesInUse (#5514)
This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new c362f77 GEODE-8497: added getTotalQueueSizeBytesInUse (#5514)
c362f77 is described below
commit c362f77591fa83e5f87a056fe221d1241de87348
Author: Mario Ivanac <48...@users.noreply.github.com>
AuthorDate: Mon Oct 19 08:02:55 2020 +0200
GEODE-8497: added getTotalQueueSizeBytesInUse (#5514)
* GEODE-8497: added getTotalQueueSizeBytesInUse
* GEODE-8497: updated UT
* GEODE-8497: fix fail statistics after restart
---
.../geode/internal/cache/RegionMapFactory.java | 9 ++++++++-
.../geode/management/GatewaySenderMXBean.java | 5 +++++
.../internal/beans/GatewaySenderMBean.java | 5 +++++
.../internal/beans/GatewaySenderMBeanBridge.java | 6 ++++++
.../beans/stats/GatewaySenderOverflowMonitor.java | 21 +++++++++++++++++++++
.../management/internal/beans/stats/StatsKey.java | 2 ++
.../stats/GatewaySenderOverflowMonitorTest.java | 18 +++++++++++++++++-
7 files changed, 64 insertions(+), 2 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMapFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMapFactory.java
index 48e64fe..92c85fe 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMapFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMapFactory.java
@@ -15,7 +15,7 @@
package org.apache.geode.internal.cache;
-
+import org.apache.geode.internal.cache.eviction.EvictionController;
/**
* Used to produce instances of RegionMap
@@ -39,6 +39,13 @@ class RegionMapFactory {
// eviction tests to fail
return new ProxyRegionMap(owner, attrs, internalRegionArgs);
} else if (owner.isEntryEvictionPossible()) {
+ if (owner instanceof PartitionedRegion) {
+ PartitionedRegion pr = (PartitionedRegion) owner;
+ EvictionController evctrl = pr.getPREvictionControllerFromDiskInitialization();
+ if (evctrl != null) {
+ return new VMLRURegionMap(owner, attrs, internalRegionArgs, evctrl);
+ }
+ }
return new VMLRURegionMap(owner, attrs, internalRegionArgs);
} else {
return new VMRegionMap(owner, attrs, internalRegionArgs);
diff --git a/geode-core/src/main/java/org/apache/geode/management/GatewaySenderMXBean.java b/geode-core/src/main/java/org/apache/geode/management/GatewaySenderMXBean.java
index 67c1350..a889dd5 100644
--- a/geode-core/src/main/java/org/apache/geode/management/GatewaySenderMXBean.java
+++ b/geode-core/src/main/java/org/apache/geode/management/GatewaySenderMXBean.java
@@ -177,6 +177,11 @@ public interface GatewaySenderMXBean {
int getTotalBatchesRedistributed();
/**
+ * Returns the total number of bytes in heap occupied by the event queue.
+ */
+ long getTotalQueueSizeBytesInUse();
+
+ /**
* Starts this GatewaySender. Once the GatewaySender is running its configuration cannot be
* changed.
*
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBean.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBean.java
index a2d1251..1f422ff 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBean.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBean.java
@@ -134,6 +134,11 @@ public class GatewaySenderMBean extends NotificationBroadcasterSupport
}
@Override
+ public long getTotalQueueSizeBytesInUse() {
+ return bridge.getTotalQueueSizeBytesInUse();
+ }
+
+ @Override
public boolean isBatchConflationEnabled() {
return bridge.isBatchConflationEnabled();
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java
index 42b4bbf..7eda2b4 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java
@@ -291,6 +291,12 @@ public class GatewaySenderMBeanBridge {
.longValue();
}
+ public long getTotalQueueSizeBytesInUse() {
+ return overflowMonitor.getStatistic(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY)
+ .longValue();
+ }
+
+
private Number getStatistic(String statName) {
if (monitor != null) {
return monitor.getStatistic(statName);
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java
index 24055fa..fb38cd1 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java
@@ -47,6 +47,8 @@ public class GatewaySenderOverflowMonitor extends MBeanStatsMonitor {
private volatile long lruEvictions = 0;
private volatile long bytesOverflowedToDisk = 0;
private volatile long entriesOverflowedToDisk = 0;
+ private volatile long bytesInUse = 0;
+
private final Map<Statistics, ValueMonitor> monitors;
private final Map<Statistics, StatisticsListener> listeners;
@@ -58,6 +60,10 @@ public class GatewaySenderOverflowMonitor extends MBeanStatsMonitor {
return bytesOverflowedToDisk;
}
+ public long getTotalQueueSizeBytesInUse() {
+ return bytesInUse;
+ }
+
public long getEntriesOverflowedToDisk() {
return entriesOverflowedToDisk;
}
@@ -93,6 +99,11 @@ public class GatewaySenderOverflowMonitor extends MBeanStatsMonitor {
return currentValue.longValue() - prevValue.longValue();
}
+ if (name.equals(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY)) {
+ Number prevValue = statsMap.getOrDefault(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY, 0);
+ return currentValue.longValue() - prevValue.longValue();
+ }
+
return 0;
}
@@ -111,6 +122,12 @@ public class GatewaySenderOverflowMonitor extends MBeanStatsMonitor {
bytesOverflowedToDisk += value.longValue();
return;
}
+
+ if (name.equals(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY)) {
+ bytesInUse += value.longValue();
+ return;
+ }
+
}
@Override
@@ -127,6 +144,10 @@ public class GatewaySenderOverflowMonitor extends MBeanStatsMonitor {
return getBytesOverflowedToDisk();
}
+ if (name.equals(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY)) {
+ return getTotalQueueSizeBytesInUse();
+ }
+
return 0;
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/StatsKey.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/StatsKey.java
index d8c24e3..be65916 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/StatsKey.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/StatsKey.java
@@ -309,6 +309,8 @@ public class StatsKey {
public static final String GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK = "entriesOnlyOnDisk";
public static final String GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK = "bytesOnlyOnDisk";
+ public static final String GATEWAYSENDER_BYTES_IN_MEMORY = "byteCount";
+
/** AsyncEventQueue Stats **/
public static final String ASYNCEVENTQUEUE_EVENTS_QUEUE_SIZE = "eventQueueSize";
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitorTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitorTest.java
index aefdc18..dccabea 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitorTest.java
@@ -79,6 +79,7 @@ public class GatewaySenderOverflowMonitorTest {
assertThat(gatewaySenderOverflowMonitor.getLruEvictions()).isEqualTo(0);
assertThat(gatewaySenderOverflowMonitor.getBytesOverflowedToDisk()).isEqualTo(0);
assertThat(gatewaySenderOverflowMonitor.getEntriesOverflowedToDisk()).isEqualTo(0);
+ assertThat(gatewaySenderOverflowMonitor.getTotalQueueSizeBytesInUse()).isEqualTo(0);
}
@Test
@@ -94,6 +95,7 @@ public class GatewaySenderOverflowMonitorTest {
statsMap.put(StatsKey.GATEWAYSENDER_LRU_EVICTIONS, 50);
statsMap.put(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK, 2048);
statsMap.put(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK, 100);
+ statsMap.put(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY, 1024);
assertThat(gatewaySenderOverflowMonitor.computeDelta(statsMap,
StatsKey.GATEWAYSENDER_LRU_EVICTIONS, 60)).isEqualTo(10L);
@@ -101,6 +103,9 @@ public class GatewaySenderOverflowMonitorTest {
StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK, 2100)).isEqualTo(52L);
assertThat(gatewaySenderOverflowMonitor.computeDelta(statsMap,
StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK, 150)).isEqualTo(50L);
+ assertThat(gatewaySenderOverflowMonitor.computeDelta(statsMap,
+ StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY, 2048)).isEqualTo(1024L);
+
}
@Test
@@ -110,6 +115,8 @@ public class GatewaySenderOverflowMonitorTest {
1024L);
gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK,
10000L);
+ gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY,
+ 4096L);
assertThat(gatewaySenderOverflowMonitor.getStatistic(StatsKey.GATEWAYSENDER_LRU_EVICTIONS))
.isEqualTo(5L);
assertThat(
@@ -117,12 +124,16 @@ public class GatewaySenderOverflowMonitorTest {
.isEqualTo(1024L);
assertThat(gatewaySenderOverflowMonitor
.getStatistic(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK)).isEqualTo(10000L);
+ assertThat(gatewaySenderOverflowMonitor
+ .getStatistic(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY)).isEqualTo(4096L);
gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_LRU_EVICTIONS, 5L);
gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK,
1024L);
gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK,
10000L);
+ gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY,
+ 2048L);
assertThat(gatewaySenderOverflowMonitor.getStatistic(StatsKey.GATEWAYSENDER_LRU_EVICTIONS))
.isEqualTo(10L);
assertThat(
@@ -130,6 +141,8 @@ public class GatewaySenderOverflowMonitorTest {
.isEqualTo(2048L);
assertThat(gatewaySenderOverflowMonitor
.getStatistic(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK)).isEqualTo(20000L);
+ assertThat(gatewaySenderOverflowMonitor
+ .getStatistic(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY)).isEqualTo(6144L);
}
@Test
@@ -144,7 +157,8 @@ public class GatewaySenderOverflowMonitorTest {
2048);
gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK,
10000);
-
+ gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY,
+ 2048);
assertThat(gatewaySenderOverflowMonitor.getStatistic(StatsKey.GATEWAYSENDER_LRU_EVICTIONS))
.isEqualTo(5L);
assertThat(
@@ -152,6 +166,8 @@ public class GatewaySenderOverflowMonitorTest {
.isEqualTo(2048L);
assertThat(gatewaySenderOverflowMonitor
.getStatistic(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK)).isEqualTo(10000L);
+ assertThat(gatewaySenderOverflowMonitor
+ .getStatistic(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY)).isEqualTo(2048L);
}
@Test