You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2020/11/13 23:06:07 UTC

[geode] 01/03: Add test for newFunctionThreadPoolWithFeedStatistics

This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch GEODE-8521-CoreLoggingExecutors-tests
in repository https://gitbox.apache.org/repos/asf/geode.git

commit a1a7f003b5fc117237059fd560099eff227b2751
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Thu Oct 29 17:29:57 2020 -0700

    Add test for newFunctionThreadPoolWithFeedStatistics
---
 .../internal/FunctionExecutionPooledExecutor.java  |   6 +
 .../internal/OverflowQueueWithDMStats.java         |   7 ++
 .../internal/logging/CoreLoggingExecutorsTest.java | 122 +++++++++++++++++++++
 3 files changed, 135 insertions(+)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/FunctionExecutionPooledExecutor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/FunctionExecutionPooledExecutor.java
index 45b8333..7d485b8 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/FunctionExecutionPooledExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/FunctionExecutionPooledExecutor.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.SystemFailure;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.internal.monitoring.ThreadsMonitoring;
 import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -284,6 +285,11 @@ public class FunctionExecutionPooledExecutor extends ThreadPoolExecutor {
     }
   }
 
+  @VisibleForTesting
+  public BlockingQueue<Runnable> getBufferQueue() {
+    return bufferQueue;
+  }
+
   private static int getCorePoolSize(int maxSize) {
     if (maxSize == Integer.MAX_VALUE) {
       return 0;
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/OverflowQueueWithDMStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/OverflowQueueWithDMStats.java
index 4958928..7b5bdbd 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/OverflowQueueWithDMStats.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/OverflowQueueWithDMStats.java
@@ -19,6 +19,8 @@ import java.util.Collection;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.geode.annotations.VisibleForTesting;
+
 /**
  * A LinkedBlockingQueue that supports stats. Named OverflowQueue for historical reasons.
  *
@@ -183,4 +185,9 @@ public class OverflowQueueWithDMStats<E> extends LinkedBlockingQueue<E> {
   protected void postDrain(Collection<? super E> c) {
     // do nothing in this class. sub-classes can override
   }
+
+  @VisibleForTesting
+  public QueueStatHelper getQueueStatHelper() {
+    return stats;
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java b/geode-core/src/test/java/org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java
new file mode 100644
index 0000000..8ccf4d0
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.logging;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.FunctionExecutionPooledExecutor;
+import org.apache.geode.distributed.internal.OverflowQueueWithDMStats;
+import org.apache.geode.distributed.internal.PoolStatHelper;
+import org.apache.geode.distributed.internal.QueueStatHelper;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.logging.internal.executors.LoggingThread;
+import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
+import org.apache.geode.logging.internal.executors.LoggingThreadFactory.CommandWrapper;
+import org.apache.geode.logging.internal.executors.LoggingThreadFactory.ThreadInitializer;
+
+public class CoreLoggingExecutorsTest {
+
+  private CommandWrapper commandWrapper;
+  private PoolStatHelper poolStatHelper;
+  private QueueStatHelper queueStatHelper;
+  private Runnable runnable;
+  private ThreadInitializer threadInitializer;
+  private ThreadsMonitoring threadsMonitoring;
+
+  @Before
+  public void setUp() {
+    commandWrapper = mock(CommandWrapper.class);
+    poolStatHelper = mock(PoolStatHelper.class);
+    queueStatHelper = mock(QueueStatHelper.class);
+    runnable = mock(Runnable.class);
+    threadInitializer = mock(ThreadInitializer.class);
+    threadsMonitoring = mock(ThreadsMonitoring.class);
+  }
+
+  @Test
+  public void newFixedThreadPoolWithTimeout() {
+    int poolSize = 5;
+    int keepAliveTime = 2;
+    String threadName = "thread";
+
+    ExecutorService executorService = CoreLoggingExecutors
+        .newFixedThreadPoolWithTimeout(poolSize, keepAliveTime, MINUTES, queueStatHelper,
+            threadName);
+
+    assertThat(executorService).isInstanceOf(ThreadPoolExecutor.class);
+
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService;
+
+    assertThat(executor.getCorePoolSize()).isEqualTo(poolSize);
+    assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(keepAliveTime);
+    assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
+    assertThat(executor.getQueue()).isInstanceOf(OverflowQueueWithDMStats.class);
+    assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+    OverflowQueueWithDMStats overflowQueueWithDMStats =
+        (OverflowQueueWithDMStats) executor.getQueue();
+
+    assertThat(overflowQueueWithDMStats.getQueueStatHelper()).isSameAs(queueStatHelper);
+
+    ThreadFactory threadFactory = executor.getThreadFactory();
+    Thread thread = threadFactory.newThread(runnable);
+
+    assertThat(thread).isInstanceOf(LoggingThread.class);
+    assertThat(thread.getName()).contains(threadName);
+  }
+
+  @Test
+  public void newFunctionThreadPoolWithFeedStatistics() {
+    int poolSize = 5;
+    int workQueueSize = 2;
+    String threadName = "thread";
+
+    ExecutorService executorService = CoreLoggingExecutors
+        .newFunctionThreadPoolWithFeedStatistics(poolSize, workQueueSize, queueStatHelper,
+            threadName, threadInitializer, commandWrapper, poolStatHelper, threadsMonitoring);
+
+    assertThat(executorService).isInstanceOf(FunctionExecutionPooledExecutor.class);
+
+    FunctionExecutionPooledExecutor executor = (FunctionExecutionPooledExecutor) executorService;
+
+    assertThat(executor.getCorePoolSize()).isEqualTo(1);
+    assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(30);
+    assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
+    assertThat(executor.getBufferQueue()).isInstanceOf(OverflowQueueWithDMStats.class);
+    assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
+    assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+    OverflowQueueWithDMStats overflowQueueWithDMStats =
+        (OverflowQueueWithDMStats) executor.getBufferQueue();
+
+    assertThat(overflowQueueWithDMStats.getQueueStatHelper()).isSameAs(queueStatHelper);
+
+    ThreadFactory threadFactory = executor.getThreadFactory();
+    Thread thread = threadFactory.newThread(runnable);
+
+    assertThat(thread).isInstanceOf(LoggingThread.class);
+    assertThat(thread.getName()).contains(threadName);
+  }
+}