You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by al...@apache.org on 2022/06/22 16:01:24 UTC

[geode] branch develop updated: GEODE-10323: Remove schedule threads in MemoryAllocatorImpl constructor (#7715)

This is an automated email from the ASF dual-hosted git repository.

alberto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8b85b2e07d GEODE-10323: Remove schedule threads in MemoryAllocatorImpl constructor (#7715)
8b85b2e07d is described below

commit 8b85b2e07d2da39e82bd9ca4817b44bc24d37876
Author: Alberto Gomez <al...@est.tech>
AuthorDate: Wed Jun 22 18:01:10 2022 +0200

    GEODE-10323: Remove schedule threads in MemoryAllocatorImpl constructor (#7715)
    
    * GEODE-10323: Remove schedule threads in MemoryAllocatorImpl
    
    The scheduled executor used in MemoryAllocatorImpl
    was scheduled in the constructor. This provoked
    intermittent failures in OffHeapStorageJUnitTest testCreateOffHeapStorage
    test cases due to a race condition.
    
    The scheduling has been moved to a new method (start())
    in the MemoryAllocatorImpl class that is in turn
    invoked in the create() static method.
    
    * GEODE-10323: Extract update stats code to new class
---
 .../internal/offheap/MemoryAllocatorImpl.java      | 70 +++++++++++-----------
 .../internal/offheap/NonRealTimeStatsUpdater.java  | 47 +++++++++++++++
 .../geode/internal/offheap/OffHeapStorage.java     | 19 ++----
 .../internal/offheap/MemoryAllocatorJUnitTest.java | 13 ++--
 .../internal/offheap/OffHeapHelperJUnitTest.java   |  2 +-
 .../OffHeapRegionEntryHelperInstanceTest.java      |  2 +-
 .../internal/offheap/OffHeapStorageJUnitTest.java  |  3 +-
 .../OffHeapStorageNonRuntimeStatsJUnitTest.java    |  3 +-
 .../offheap/OffHeapStoredObjectJUnitTest.java      |  2 +-
 9 files changed, 102 insertions(+), 59 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java
index 4e433e4b10..70f8f4ab0e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/MemoryAllocatorImpl.java
@@ -20,10 +20,8 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 import org.apache.logging.log4j.Logger;
 
@@ -39,7 +37,6 @@ import org.apache.geode.internal.cache.RegionEntry;
 import org.apache.geode.internal.lang.SystemProperty;
 import org.apache.geode.internal.offheap.annotations.OffHeapIdentifier;
 import org.apache.geode.internal.offheap.annotations.Unretained;
-import org.apache.geode.logging.internal.executors.LoggingExecutors;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.util.internal.GeodeGlossary;
 
@@ -64,14 +61,14 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
       SystemProperty.getProductIntegerProperty(
           "off-heap-stats-update-frequency-ms").orElse(3600000);
 
-  private final ScheduledExecutorService updateNonRealTimeStatsExecutor;
-
-  private final ScheduledFuture<?> updateNonRealTimeStatsFuture;
+  private final NonRealTimeStatsUpdater nonRealTimeStatsUpdater;
 
   private volatile OffHeapMemoryStats stats;
 
   private volatile OutOfOffHeapMemoryListener ooohml;
 
+  private final int updateOffHeapStatsFrequencyMs;
+
   OutOfOffHeapMemoryListener getOutOfOffHeapMemoryListener() {
     return ooohml;
   }
@@ -98,20 +95,17 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
 
   public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats,
       int slabCount, long offHeapMemorySize, long maxSlabSize,
-      int updateOffHeapStatsFrequencyMs) {
+      Supplier<Integer> updateOffHeapStatsFrequencyMsSupplier,
+      Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) {
     return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null,
-        SlabImpl::new, updateOffHeapStatsFrequencyMs);
+        SlabImpl::new, updateOffHeapStatsFrequencyMsSupplier, nonRealTimeStatsUpdaterSupplier);
   }
 
-  public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats,
-      int slabCount, long offHeapMemorySize, long maxSlabSize) {
-    return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null,
-        SlabImpl::new, UPDATE_OFF_HEAP_STATS_FREQUENCY_MS);
-  }
-
-  private static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml,
+  static MemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml,
       OffHeapMemoryStats stats, int slabCount, long offHeapMemorySize, long maxSlabSize,
-      Slab[] slabs, SlabFactory slabFactory, int updateOffHeapStatsFrequencyMs) {
+      Slab[] slabs, SlabFactory slabFactory,
+      Supplier<Integer> updateOffHeapStatsFrequencyMsSupplier,
+      Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) {
     MemoryAllocatorImpl result = singleton;
     boolean created = false;
     try {
@@ -155,7 +149,10 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
           }
         }
 
-        result = new MemoryAllocatorImpl(ooohml, stats, slabs, updateOffHeapStatsFrequencyMs);
+        result = new MemoryAllocatorImpl(ooohml, stats, slabs,
+            updateOffHeapStatsFrequencyMsSupplier == null ? UPDATE_OFF_HEAP_STATS_FREQUENCY_MS
+                : updateOffHeapStatsFrequencyMsSupplier.get(),
+            nonRealTimeStatsUpdaterSupplier);
         singleton = result;
         LifecycleListener.invokeAfterCreate(result);
         created = true;
@@ -170,16 +167,10 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
         }
       }
     }
+    result.start();
     return result;
   }
 
-  static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener ooohml,
-      OffHeapMemoryStats stats, int slabCount, long offHeapMemorySize, long maxSlabSize,
-      SlabFactory memChunkFactory) {
-    return create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize, null, memChunkFactory,
-        UPDATE_OFF_HEAP_STATS_FREQUENCY_MS);
-  }
-
   public static MemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener oooml,
       OffHeapMemoryStats stats, Slab[] slabs) {
     int slabCount = 0;
@@ -196,10 +187,9 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
       }
     }
     return create(oooml, stats, slabCount, offHeapMemorySize, maxSlabSize, slabs, null,
-        UPDATE_OFF_HEAP_STATS_FREQUENCY_MS);
+        null, () -> null);
   }
 
-
   private void reuse(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats newStats,
       long offHeapMemorySize, Slab[] slabs) {
     if (isClosed()) {
@@ -223,7 +213,8 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
 
   private MemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml,
       final OffHeapMemoryStats stats, final Slab[] slabs,
-      int updateOffHeapStatsFrequencyMs) {
+      int updateOffHeapStatsFrequencyMs,
+      Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) {
     if (oooml == null) {
       throw new IllegalArgumentException("OutOfOffHeapMemoryListener is null");
     }
@@ -239,11 +230,19 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
     this.stats.incMaxMemory(freeList.getTotalMemory());
     this.stats.incFreeMemory(freeList.getTotalMemory());
 
-    updateNonRealTimeStatsExecutor =
-        LoggingExecutors.newSingleThreadScheduledExecutor("Update Freelist Stats thread");
-    updateNonRealTimeStatsFuture =
-        updateNonRealTimeStatsExecutor.scheduleAtFixedRate(freeList::updateNonRealTimeStats, 0,
-            updateOffHeapStatsFrequencyMs, TimeUnit.MILLISECONDS);
+    this.updateOffHeapStatsFrequencyMs = updateOffHeapStatsFrequencyMs;
+
+    if (nonRealTimeStatsUpdaterSupplier == null) {
+      nonRealTimeStatsUpdater = new NonRealTimeStatsUpdater(freeList::updateNonRealTimeStats);
+    } else {
+      nonRealTimeStatsUpdater = nonRealTimeStatsUpdaterSupplier.get();
+    }
+  }
+
+  void start() {
+    if (nonRealTimeStatsUpdater != null) {
+      nonRealTimeStatsUpdater.start(updateOffHeapStatsFrequencyMs);
+    }
   }
 
   public List<OffHeapStoredObject> getLostChunks(InternalCache cache) {
@@ -407,8 +406,9 @@ public class MemoryAllocatorImpl implements MemoryAllocator {
     if (setClosed()) {
       freeList.freeSlabs();
       stats.close();
-      updateNonRealTimeStatsFuture.cancel(true);
-      updateNonRealTimeStatsExecutor.shutdown();
+      if (nonRealTimeStatsUpdater != null) {
+        nonRealTimeStatsUpdater.stop();
+      }
       singleton = null;
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/NonRealTimeStatsUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/NonRealTimeStatsUpdater.java
new file mode 100644
index 0000000000..933b28ba35
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/NonRealTimeStatsUpdater.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.offheap;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.geode.logging.internal.executors.LoggingExecutors;
+
+public class NonRealTimeStatsUpdater {
+  private final Runnable updateTask;
+  private final ScheduledExecutorService updateNonRealTimeStatsExecutor;
+  private final AtomicReference<ScheduledFuture<?>> updateNonRealTimeStatsFuture =
+      new AtomicReference<>();
+
+  NonRealTimeStatsUpdater(Runnable updateTask) {
+    this.updateTask = updateTask;
+    updateNonRealTimeStatsExecutor =
+        LoggingExecutors.newSingleThreadScheduledExecutor("Update Freelist Stats thread");
+
+  }
+
+  void start(int updateOffHeapStatsFrequencyMs) {
+    updateNonRealTimeStatsFuture
+        .set(updateNonRealTimeStatsExecutor.scheduleAtFixedRate(updateTask, 0,
+            updateOffHeapStatsFrequencyMs, TimeUnit.MILLISECONDS));
+  }
+
+  void stop() {
+    updateNonRealTimeStatsFuture.get().cancel(true);
+    updateNonRealTimeStatsExecutor.shutdown();
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapStorage.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapStorage.java
index 755fef9fc1..2bbd5876cb 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapStorage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapStorage.java
@@ -15,6 +15,7 @@
 package org.apache.geode.internal.offheap;
 
 import java.lang.reflect.Method;
+import java.util.function.Supplier;
 
 import org.apache.geode.StatisticDescriptor;
 import org.apache.geode.Statistics;
@@ -219,22 +220,12 @@ public class OffHeapStorage implements OffHeapMemoryStats {
     // ooohml provides the hook for disconnecting and closing cache on OutOfOffHeapMemoryException
     OutOfOffHeapMemoryListener ooohml =
         new DisconnectingOutOfOffHeapMemoryListener((InternalDistributedSystem) system);
-    return basicCreateOffHeapStorage(sf, offHeapMemorySize, ooohml);
+    return basicCreateOffHeapStorage(sf, offHeapMemorySize, ooohml, null, null);
   }
 
   static MemoryAllocator basicCreateOffHeapStorage(StatisticsFactory sf, long offHeapMemorySize,
-      OutOfOffHeapMemoryListener ooohml) {
-    final OffHeapMemoryStats stats = new OffHeapStorage(sf);
-
-    final long maxSlabSize = calcMaxSlabSize(offHeapMemorySize);
-
-    final int slabCount = calcSlabCount(maxSlabSize, offHeapMemorySize);
-
-    return MemoryAllocatorImpl.create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize);
-  }
-
-  static MemoryAllocator basicCreateOffHeapStorage(StatisticsFactory sf, long offHeapMemorySize,
-      OutOfOffHeapMemoryListener ooohml, int updateOffHeapStatsFrequencyMs) {
+      OutOfOffHeapMemoryListener ooohml, Supplier<Integer> updateOffHeapStatsFrequencyMsSupplier,
+      Supplier<NonRealTimeStatsUpdater> nonRealTimeStatsUpdaterSupplier) {
     final OffHeapMemoryStats stats = new OffHeapStorage(sf);
 
     final long maxSlabSize = calcMaxSlabSize(offHeapMemorySize);
@@ -242,7 +233,7 @@ public class OffHeapStorage implements OffHeapMemoryStats {
     final int slabCount = calcSlabCount(maxSlabSize, offHeapMemorySize);
 
     return MemoryAllocatorImpl.create(ooohml, stats, slabCount, offHeapMemorySize, maxSlabSize,
-        updateOffHeapStatsFrequencyMs);
+        updateOffHeapStatsFrequencyMsSupplier, nonRealTimeStatsUpdaterSupplier);
   }
 
   private static final long MAX_SLAB_SIZE = Integer.MAX_VALUE;
diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/MemoryAllocatorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/MemoryAllocatorJUnitTest.java
index 6de0312f41..2626fd051b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/offheap/MemoryAllocatorJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/MemoryAllocatorJUnitTest.java
@@ -72,9 +72,9 @@ public class MemoryAllocatorJUnitTest {
       NullOutOfOffHeapMemoryListener listener = new NullOutOfOffHeapMemoryListener();
       NullOffHeapMemoryStats stats = new NullOffHeapMemoryStats();
       try {
-        MemoryAllocatorImpl.createForUnitTest(listener, stats, 10, 950, 100, size -> {
+        MemoryAllocatorImpl.create(listener, stats, 10, 950, 100, null, size -> {
           throw new OutOfMemoryError("expected");
-        });
+        }, null, () -> null);
       } catch (OutOfMemoryError expected) {
       }
       assertTrue(listener.isClosed());
@@ -98,7 +98,8 @@ public class MemoryAllocatorJUnitTest {
             }
           }
         };
-        MemoryAllocatorImpl.createForUnitTest(listener, stats, 10, 950, MAX_SLAB_SIZE, factory);
+        MemoryAllocatorImpl.create(listener, stats, 10, 950, MAX_SLAB_SIZE, null, factory, null,
+            () -> null);
       } catch (OutOfMemoryError expected) {
       }
       assertTrue(listener.isClosed());
@@ -109,7 +110,8 @@ public class MemoryAllocatorJUnitTest {
       NullOffHeapMemoryStats stats = new NullOffHeapMemoryStats();
       SlabFactory factory = SlabImpl::new;
       MemoryAllocator ma =
-          MemoryAllocatorImpl.createForUnitTest(listener, stats, 10, 950, 100, factory);
+          MemoryAllocatorImpl.create(listener, stats, 10, 950, 100, null, factory, null,
+              () -> null);
       try {
         assertFalse(listener.isClosed());
         assertFalse(stats.isClosed());
@@ -135,7 +137,8 @@ public class MemoryAllocatorJUnitTest {
         listener = new NullOutOfOffHeapMemoryListener();
         stats2 = new NullOffHeapMemoryStats();
         MemoryAllocator ma2 =
-            MemoryAllocatorImpl.createForUnitTest(listener, stats2, 10, 950, 100, factory);
+            MemoryAllocatorImpl.create(listener, stats2, 10, 950, 100, null, factory, null,
+                () -> null);
         assertSame(ma, ma2);
         assertTrue(stats.isClosed());
         assertFalse(listener.isClosed());
diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapHelperJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapHelperJUnitTest.java
index 7f5fcdcce6..7019848647 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapHelperJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapHelperJUnitTest.java
@@ -46,7 +46,7 @@ public class OffHeapHelperJUnitTest extends AbstractStoredObjectTestBase {
     OffHeapMemoryStats stats = mock(OffHeapMemoryStats.class);
 
     ma = MemoryAllocatorImpl.create(ooohml, stats, 3, OffHeapStorage.MIN_SLAB_SIZE * 3,
-        OffHeapStorage.MIN_SLAB_SIZE);
+        OffHeapStorage.MIN_SLAB_SIZE, null, () -> null);
   }
 
   /**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstanceTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstanceTest.java
index d32cb8b261..87786d1df5 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstanceTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstanceTest.java
@@ -81,7 +81,7 @@ public class OffHeapRegionEntryHelperInstanceTest {
 
     memoryAllocator =
         MemoryAllocatorImpl.create(listener, stats, 1, OffHeapStorage.MIN_SLAB_SIZE,
-            OffHeapStorage.MIN_SLAB_SIZE);
+            OffHeapStorage.MIN_SLAB_SIZE, null, () -> null);
 
     offHeapRegionEntryHelperInstance =
         spy(new OffHeapRegionEntryHelperInstance(ohAddress -> offHeapStoredObject,
diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageJUnitTest.java
index f940bca6f8..d30a4feeca 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageJUnitTest.java
@@ -167,7 +167,8 @@ public class OffHeapStorageJUnitTest {
     StatisticsFactory localStatsFactory = new LocalStatisticsFactory(null);
     OutOfOffHeapMemoryListener ooohml = mock(OutOfOffHeapMemoryListener.class);
     MemoryAllocator ma =
-        OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml);
+        OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml, null,
+            () -> null);
     try {
       OffHeapMemoryStats stats = ma.getStats();
       assertNotNull(stats.getStats());
diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageNonRuntimeStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageNonRuntimeStatsJUnitTest.java
index 2aecc7b703..0dd651f641 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageNonRuntimeStatsJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStorageNonRuntimeStatsJUnitTest.java
@@ -30,7 +30,8 @@ public class OffHeapStorageNonRuntimeStatsJUnitTest {
     StatisticsFactory localStatsFactory = new LocalStatisticsFactory(null);
     OutOfOffHeapMemoryListener ooohml = mock(OutOfOffHeapMemoryListener.class);
     MemoryAllocator ma =
-        OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml, 100);
+        OffHeapStorage.basicCreateOffHeapStorage(localStatsFactory, 1024 * 1024, ooohml, () -> 100,
+            null);
     try {
       OffHeapMemoryStats stats = ma.getStats();
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStoredObjectJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStoredObjectJUnitTest.java
index 2801c6dfc1..d3e1b90bbc 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStoredObjectJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/offheap/OffHeapStoredObjectJUnitTest.java
@@ -74,7 +74,7 @@ public class OffHeapStoredObjectJUnitTest extends AbstractStoredObjectTestBase {
     OffHeapMemoryStats stats = mock(OffHeapMemoryStats.class);
 
     ma = MemoryAllocatorImpl.create(ooohml, stats, 3, OffHeapStorage.MIN_SLAB_SIZE * 3,
-        OffHeapStorage.MIN_SLAB_SIZE);
+        OffHeapStorage.MIN_SLAB_SIZE, null, () -> null);
   }
 
   @After