You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by al...@apache.org on 2022/06/22 16:01:24 UTC
[geode] branch develop updated: GEODE-10323: Remove schedule threads in MemoryAllocatorImpl constructor (#7715)
This is an automated email from the ASF dual-hosted git repository.
alberto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 8b85b2e07d GEODE-10323: Remove schedule threads in MemoryAllocatorImpl constructor (#7715)
8b85b2e07d is described below
commit 8b85b2e07d2da39e82bd9ca4817b44bc24d37876
Author: Alberto Gomez <al...@est.tech>
AuthorDate: Wed Jun 22 18:01:10 2022 +0200
GEODE-10323: Remove schedule threads in MemoryAllocatorImpl constructor (#7715)
* GEODE-10323: Remove schedule threads in MemoryAllocatorImpl
The scheduled executor used in MemoryAllocatorImpl
was scheduled in the constructor. This provoked
intermittent failures in OffHeapStorageJUnitTest testCreateOffHeapStorage
test cases due to a race condition.
The scheduling has been moved to a new method (start())
in the MemoryAllocatorImpl class that is in turn
invoked in the create() static method.
* GEODE-10323: Extract update stats code to new class
---
.../internal/offheap/MemoryAllocatorImpl.java | 70 +++++++++++-----------
.../internal/offheap/NonRealTimeStatsUpdater.java | 47 +++++++++++++++
.../geode/internal/offheap/OffHeapStorage.java | 19 ++----
.../internal/offheap/MemoryAllocatorJUnitTest.java | 13 ++--
.../internal/offheap/OffHeapHelperJUnitTest.java | 2 +-
.../OffHeapRegionEntryHelperInstanceTest.java | 2 +-
.../internal/offheap/OffHeapStorageJUnitTest.java | 3 +-
.../OffHeapStorageNonRuntimeStatsJUnitTest.java | 3 +-
.../offheap/OffHeapStoredObjectJUnitTest.java | 2 +-
9 files changed, 102 insertions(+), 59 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java
index 4e433e4b10..70f8f4ab0e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java
@@ -20,10 +20,8 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
@@ -39,7 +37,6 @@ import org.apache.geode.internal.cache.RegionEntry;
import org.apache.geode.internal.lang.SystemProperty;
import org.apache.geode.internal.offheap.annotations.OffHeapIdentifier;
import org.apache.geode.internal.offheap.annotations.Unretained;
-import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.util.internal.GeodeGlossary;
@@ -64,14 +61,14 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
SystemProperty.getProductIntegerProperty(
"off-heap-stats-update-frequency-ms").orElse(3600000);
- private final ScheduledExecutorService updateNonRealTimeStatsExecutor;
-
- private final ScheduledFuture<?> updateNonRealTimeStatsFuture;
+ private final NonRealTimeStatsUpdater nonRealTimeStatsUpdater;
private volatile OffHeapMemoryStats stats;
private volatile OutOfOffHeapMemoryListener ooohml;
+ private final int updateOffHeapStatsFrequencyMs;
+
OutOfOffHeapMemoryListener getOutOfOffHeapMemoryListener() {
return ooohml;
}
@@ -98,20 +95,17 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats,
int slabCount, long offHeapMemorySize, long maxSlabSize,
- int updateOffHeapStatsFrequencyMs) {
+ Supplier<Integer> updateOffHeapStatsFrequencyMsSupplier,
+ Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) {
return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null,
- SlabImpl::new, updateOffHeapStatsFrequencyMs);
+ SlabImpl::new, updateOffHeapStatsFrequencyMsSupplier, nonRealTimeStatsUpdaterSupplier);
}
- public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats,
- int slabCount, long offHeapMemorySize, long maxSlabSize) {
- return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null,
- SlabImpl::new, UPDATE_OFF_HEAP_STATS_FREQUENCY_MS);
- }
-
- private static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml,
+ static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml,
OffHeapMemoryStats stats, int slabCount, long offHeapMemorySize, long maxSlabSize,
- Slab[] slabs, SlabFactory slabFactory, int updateOffHeapStatsFrequencyMs) {
+ Slab[] slabs, SlabFactory slabFactory,
+ Supplier<Integer> updateOffHeapStatsFrequencyMsSupplier,
+ Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) {
MemoryAllocatorImpl result = singleton;
boolean created = false;
try {
@@ -155,7 +149,10 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
}
}
- result = new MemoryAllocatorImpl(ooohml, stats, slabs, updateOffHeapStatsFrequencyMs);
+ result = new MemoryAllocatorImpl(ooohml, stats, slabs,
+ updateOffHeapStatsFrequencyMsSupplier == null ? UPDATE_OFF_HEAP_STATS_FREQUENCY_MS
+ : updateOffHeapStatsFrequencyMsSupplier.get(),
+ nonRealTimeStatsUpdaterSupplier);
singleton = result;
LifecycleListener.invokeAfterCreate(result);
created = true;
@@ -170,16 +167,10 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
}
}
}
+ result.start();
return result;
}
- static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener ooohml,
- OffHeapMemoryStats stats, int slabCount, long offHeapMemorySize, long maxSlabSize,
- SlabFactory memChunkFactory) {
- return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null, memChunkFactory,
- UPDATE_OFF_HEAP_STATS_FREQUENCY_MS);
- }
-
public static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener oooml,
OffHeapMemoryStats stats, Slab[] slabs) {
int slabCount = 0;
@@ -196,10 +187,9 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
}
}
return create(oooml, stats, slabCount, offHeapMemorySize, maxSlabSize, slabs, null,
- UPDATE_OFF_HEAP_STATS_FREQUENCY_MS);
+ null, () -> null);
}
-
private void reuse(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats newStats,
long offHeapMemorySize, Slab[] slabs) {
if (isClosed()) {
@@ -223,7 +213,8 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
private MemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml,
final OffHeapMemoryStats stats, final Slab[] slabs,
- int updateOffHeapStatsFrequencyMs) {
+ int updateOffHeapStatsFrequencyMs,
+ Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) {
if (oooml == null) {
throw new IllegalArgumentException("OutOfOffHeapMemoryListener is null");
}
@@ -239,11 +230,19 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
this.stats.incMaxMemory(freeList.getTotalMemory());
this.stats.incFreeMemory(freeList.getTotalMemory());
- updateNonRealTimeStatsExecutor =
- LoggingExecutors.newSingleThreadScheduledExecutor("Update Freelist Stats thread");
- updateNonRealTimeStatsFuture =
- updateNonRealTimeStatsExecutor.scheduleAtFixedRate(freeList::updateNonRealTimeStats, 0,
- updateOffHeapStatsFrequencyMs, TimeUnit.MILLISECONDS);
+ this.updateOffHeapStatsFrequencyMs = updateOffHeapStatsFrequencyMs;
+
+ if (nonRealTimeStatsUpdaterSupplier == null) {
+ nonRealTimeStatsUpdater = new NonRealTimeStatsUpdater(freeList::updateNonRealTimeStats);
+ } else {
+ nonRealTimeStatsUpdater = nonRealTimeStatsUpdaterSupplier.get();
+ }
+ }
+
+ void start() {
+ if (nonRealTimeStatsUpdater != null) {
+ nonRealTimeStatsUpdater.start(updateOffHeapStatsFrequencyMs);
+ }
}
public List<OffHeapStoredObject> getLostChunks(InternalCache cache) {
@@ -407,8 +406,9 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
if (setClosed()) {
freeList.freeSlabs();
stats.close();
- updateNonRealTimeStatsFuture.cancel(true);
- updateNonRealTimeStatsExecutor.shutdown();
+ if (nonRealTimeStatsUpdater != null) {
+ nonRealTimeStatsUpdater.stop();
+ }
singleton = null;
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/NonRealTimeStatsUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/NonRealTimeStatsUpdater.java
new file mode 100644
index 0000000000..933b28ba35
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/NonRealTimeStatsUpdater.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.offheap;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.geode.logging.internal.executors.LoggingExecutors;
+
+public class NonRealTimeStatsUpdater {
+ private final Runnable updateTask;
+ private final ScheduledExecutorService updateNonRealTimeStatsExecutor;
+ private final AtomicReference<ScheduledFuture<?>> updateNonRealTimeStatsFuture =
+ new AtomicReference<>();
+
+ NonRealTimeStatsUpdater(Runnable updateTask) {
+ this.updateTask = updateTask;
+ updateNonRealTimeStatsExecutor =
+ LoggingExecutors.newSingleThreadScheduledExecutor("Update Freelist Stats thread");
+
+ }
+
+ void start(int updateOffHeapStatsFrequencyMs) {
+ updateNonRealTimeStatsFuture
+ .set(updateNonRealTimeStatsExecutor.scheduleAtFixedRate(updateTask, 0,
+ updateOffHeapStatsFrequencyMs, TimeUnit.MILLISECONDS));
+ }
+
+ void stop() {
+ updateNonRealTimeStatsFuture.get().cancel(true);
+ updateNonRealTimeStatsExecutor.shutdown();
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapStorage.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapStorage.java
index 755fef9fc1..2bbd5876cb 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapStorage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapStorage.java
@@ -15,6 +15,7 @@
package org.apache.geode.internal.offheap;
import java.lang.reflect.Method;
+import java.util.function.Supplier;
import org.apache.geode.StatisticDescriptor;
import org.apache.geode.Statistics;
@@ -219,22 +220,12 @@ public class OffHeapStorage implements OffHeapMemoryStats {
// ooohml provides the hook for disconnecting and closing cache on OutOfOffHeapMemoryException
OutOfOffHeapMemoryListener ooohml =
new DisconnectingOutOfOffHeapMemoryListener((InternalDistributedSystem) system);
- return basicCreateOffHeapStorage(sf, offHeapMemorySize, ooohml);
+ return basicCreateOffHeapStorage(sf, offHeapMemorySize, ooohml, null, null);
}
static MemoryAllocator basicCreateOffHeapStorage(StatisticsFactory sf, long offHeapMemorySize,
- OutOfOffHeapMemoryListener ooohml) {
- final OffHeapMemoryStats stats = new OffHeapStorage(sf);
-
- final long maxSlabSize = calcMaxSlabSize(offHeapMemorySize);
-
- final int slabCount = calcSlabCount(maxSlabSize, offHeapMemorySize);
-
- return MemoryAllocatorImpl.create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize);
- }
-
- static MemoryAllocator basicCreateOffHeapStorage(StatisticsFactory sf, long offHeapMemorySize,
- OutOfOffHeapMemoryListener ooohml, int updateOffHeapStatsFrequencyMs) {
+ OutOfOffHeapMemoryListener ooohml, Supplier<Integer> updateOffHeapStatsFrequencyMsSupplier,
+ Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) {
final OffHeapMemoryStats stats = new OffHeapStorage(sf);
final long maxSlabSize = calcMaxSlabSize(offHeapMemorySize);
@@ -242,7 +233,7 @@ public class OffHeapStorage implements OffHeapMemoryStats {
final int slabCount = calcSlabCount(maxSlabSize, offHeapMemorySize);
return MemoryAllocatorImpl.create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize,
- updateOffHeapStatsFrequencyMs);
+ updateOffHeapStatsFrequencyMsSupplier, nonRealTimeStatsUpdaterSupplier);
}
private static final long MAX_SLAB_SIZE = Integer.MAX_VALUE;
diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/MemoryAllocatorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/MemoryAllocatorJUnitTest.java
index 6de0312f41..2626fd051b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/offheap/MemoryAllocatorJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/MemoryAllocatorJUnitTest.java
@@ -72,9 +72,9 @@ public class MemoryAllocatorJUnitTest {
NullOutOfOffHeapMemoryListener listener = new NullOutOfOffHeapMemoryListener();
NullOffHeapMemoryStats stats = new NullOffHeapMemoryStats();
try {
- MemoryAllocatorImpl.createForUnitTest(listener, stats, 10, 950, 100, size -> {
+ MemoryAllocatorImpl.create(listener, stats, 10, 950, 100, null, size -> {
throw new OutOfMemoryError("expected");
- });
+ }, null, () -> null);
} catch (OutOfMemoryError expected) {
}
assertTrue(listener.isClosed());
@@ -98,7 +98,8 @@ public class MemoryAllocatorJUnitTest {
}
}
};
- MemoryAllocatorImpl.createForUnitTest(listener, stats, 10, 950, MAX_SLAB_SIZE, factory);
+ MemoryAllocatorImpl.create(listener, stats, 10, 950, MAX_SLAB_SIZE, null, factory, null,
+ () -> null);
} catch (OutOfMemoryError expected) {
}
assertTrue(listener.isClosed());
@@ -109,7 +110,8 @@ public class MemoryAllocatorJUnitTest {
NullOffHeapMemoryStats stats = new NullOffHeapMemoryStats();
SlabFactory factory = SlabImpl::new;
MemoryAllocator ma =
- MemoryAllocatorImpl.createForUnitTest(listener, stats, 10, 950, 100, factory);
+ MemoryAllocatorImpl.create(listener, stats, 10, 950, 100, null, factory, null,
+ () -> null);
try {
assertFalse(listener.isClosed());
assertFalse(stats.isClosed());
@@ -135,7 +137,8 @@ public class MemoryAllocatorJUnitTest {
listener = new NullOutOfOffHeapMemoryListener();
stats2 = new NullOffHeapMemoryStats();
MemoryAllocator ma2 =
- MemoryAllocatorImpl.createForUnitTest(listener, stats2, 10, 950, 100, factory);
+ MemoryAllocatorImpl.create(listener, stats2, 10, 950, 100, null, factory, null,
+ () -> null);
assertSame(ma, ma2);
assertTrue(stats.isClosed());
assertFalse(listener.isClosed());
diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapHelperJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapHelperJUnitTest.java
index 7f5fcdcce6..7019848647 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapHelperJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapHelperJUnitTest.java
@@ -46,7 +46,7 @@ public class OffHeapHelperJUnitTest extends AbstractStoredObjectTestBase {
OffHeapMemoryStats stats = mock(OffHeapMemoryStats.class);
ma = MemoryAllocatorImpl.create(ooohml, stats, 3, OffHeapStorage.MIN_SLAB_SIZE * 3,
- OffHeapStorage.MIN_SLAB_SIZE);
+ OffHeapStorage.MIN_SLAB_SIZE, null, () -> null);
}
/**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstanceTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstanceTest.java
index d32cb8b261..87786d1df5 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstanceTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstanceTest.java
@@ -81,7 +81,7 @@ public class OffHeapRegionEntryHelperInstanceTest {
memoryAllocator =
MemoryAllocatorImpl.create(listener, stats, 1, OffHeapStorage.MIN_SLAB_SIZE,
- OffHeapStorage.MIN_SLAB_SIZE);
+ OffHeapStorage.MIN_SLAB_SIZE, null, () -> null);
offHeapRegionEntryHelperInstance =
spy(new OffHeapRegionEntryHelperInstance(ohAddress -> offHeapStoredObject,
diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageJUnitTest.java
index f940bca6f8..d30a4feeca 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageJUnitTest.java
@@ -167,7 +167,8 @@ public class OffHeapStorageJUnitTest {
StatisticsFactory localStatsFactory = new LocalStatisticsFactory(null);
OutOfOffHeapMemoryListener ooohml = mock(OutOfOffHeapMemoryListener.class);
MemoryAllocator ma =
- OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml);
+ OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml, null,
+ () -> null);
try {
OffHeapMemoryStats stats = ma.getStats();
assertNotNull(stats.getStats());
diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageNonRuntimeStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageNonRuntimeStatsJUnitTest.java
index 2aecc7b703..0dd651f641 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageNonRuntimeStatsJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageNonRuntimeStatsJUnitTest.java
@@ -30,7 +30,8 @@ public class OffHeapStorageNonRuntimeStatsJUnitTest {
StatisticsFactory localStatsFactory = new LocalStatisticsFactory(null);
OutOfOffHeapMemoryListener ooohml = mock(OutOfOffHeapMemoryListener.class);
MemoryAllocator ma =
- OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml, 100);
+ OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml, () -> 100,
+ null);
try {
OffHeapMemoryStats stats = ma.getStats();
diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStoredObjectJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStoredObjectJUnitTest.java
index 2801c6dfc1..d3e1b90bbc 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStoredObjectJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStoredObjectJUnitTest.java
@@ -74,7 +74,7 @@ public class OffHeapStoredObjectJUnitTest extends AbstractStoredObjectTestBase {
OffHeapMemoryStats stats = mock(OffHeapMemoryStats.class);
ma = MemoryAllocatorImpl.create(ooohml, stats, 3, OffHeapStorage.MIN_SLAB_SIZE * 3,
- OffHeapStorage.MIN_SLAB_SIZE);
+ OffHeapStorage.MIN_SLAB_SIZE, null, () -> null);
}
@After