You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2020/11/13 23:06:07 UTC
[geode] 01/03: Add test for newFunctionThreadPoolWithFeedStatistics
This is an automated email from the ASF dual-hosted git repository.
klund pushed a commit to branch GEODE-8521-CoreLoggingExecutors-tests
in repository https://gitbox.apache.org/repos/asf/geode.git
commit a1a7f003b5fc117237059fd560099eff227b2751
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Thu Oct 29 17:29:57 2020 -0700
Add test for newFunctionThreadPoolWithFeedStatistics
---
.../internal/FunctionExecutionPooledExecutor.java | 6 +
.../internal/OverflowQueueWithDMStats.java | 7 ++
.../internal/logging/CoreLoggingExecutorsTest.java | 122 +++++++++++++++++++++
3 files changed, 135 insertions(+)
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/FunctionExecutionPooledExecutor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/FunctionExecutionPooledExecutor.java
index 45b8333..7d485b8 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/FunctionExecutionPooledExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/FunctionExecutionPooledExecutor.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.logging.log4j.Logger;
import org.apache.geode.SystemFailure;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -284,6 +285,11 @@ public class FunctionExecutionPooledExecutor extends ThreadPoolExecutor {
}
}
+ @VisibleForTesting
+ public BlockingQueue<Runnable> getBufferQueue() {
+ return bufferQueue;
+ }
+
private static int getCorePoolSize(int maxSize) {
if (maxSize == Integer.MAX_VALUE) {
return 0;
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/OverflowQueueWithDMStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/OverflowQueueWithDMStats.java
index 4958928..7b5bdbd 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/OverflowQueueWithDMStats.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/OverflowQueueWithDMStats.java
@@ -19,6 +19,8 @@ import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.geode.annotations.VisibleForTesting;
+
/**
* A LinkedBlockingQueue that supports stats. Named OverflowQueue for historical reasons.
*
@@ -183,4 +185,9 @@ public class OverflowQueueWithDMStats<E> extends LinkedBlockingQueue<E> {
protected void postDrain(Collection<? super E> c) {
// do nothing in this class. sub-classes can override
}
+
+ @VisibleForTesting
+ public QueueStatHelper getQueueStatHelper() {
+ return stats;
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java b/geode-core/src/test/java/org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java
new file mode 100644
index 0000000..8ccf4d0
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.logging;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.FunctionExecutionPooledExecutor;
+import org.apache.geode.distributed.internal.OverflowQueueWithDMStats;
+import org.apache.geode.distributed.internal.PoolStatHelper;
+import org.apache.geode.distributed.internal.QueueStatHelper;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.logging.internal.executors.LoggingThread;
+import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
+import org.apache.geode.logging.internal.executors.LoggingThreadFactory.CommandWrapper;
+import org.apache.geode.logging.internal.executors.LoggingThreadFactory.ThreadInitializer;
+
+public class CoreLoggingExecutorsTest {
+
+ private CommandWrapper commandWrapper;
+ private PoolStatHelper poolStatHelper;
+ private QueueStatHelper queueStatHelper;
+ private Runnable runnable;
+ private ThreadInitializer threadInitializer;
+ private ThreadsMonitoring threadsMonitoring;
+
+ @Before
+ public void setUp() {
+ commandWrapper = mock(CommandWrapper.class);
+ poolStatHelper = mock(PoolStatHelper.class);
+ queueStatHelper = mock(QueueStatHelper.class);
+ runnable = mock(Runnable.class);
+ threadInitializer = mock(ThreadInitializer.class);
+ threadsMonitoring = mock(ThreadsMonitoring.class);
+ }
+
+ @Test
+ public void newFixedThreadPoolWithTimeout() {
+ int poolSize = 5;
+ int keepAliveTime = 2;
+ String threadName = "thread";
+
+ ExecutorService executorService = CoreLoggingExecutors
+ .newFixedThreadPoolWithTimeout(poolSize, keepAliveTime, MINUTES, queueStatHelper,
+ threadName);
+
+ assertThat(executorService).isInstanceOf(ThreadPoolExecutor.class);
+
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService;
+
+ assertThat(executor.getCorePoolSize()).isEqualTo(poolSize);
+ assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(keepAliveTime);
+ assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
+ assertThat(executor.getQueue()).isInstanceOf(OverflowQueueWithDMStats.class);
+ assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+ OverflowQueueWithDMStats overflowQueueWithDMStats =
+ (OverflowQueueWithDMStats) executor.getQueue();
+
+ assertThat(overflowQueueWithDMStats.getQueueStatHelper()).isSameAs(queueStatHelper);
+
+ ThreadFactory threadFactory = executor.getThreadFactory();
+ Thread thread = threadFactory.newThread(runnable);
+
+ assertThat(thread).isInstanceOf(LoggingThread.class);
+ assertThat(thread.getName()).contains(threadName);
+ }
+
+ @Test
+ public void newFunctionThreadPoolWithFeedStatistics() {
+ int poolSize = 5;
+ int workQueueSize = 2;
+ String threadName = "thread";
+
+ ExecutorService executorService = CoreLoggingExecutors
+ .newFunctionThreadPoolWithFeedStatistics(poolSize, workQueueSize, queueStatHelper,
+ threadName, threadInitializer, commandWrapper, poolStatHelper, threadsMonitoring);
+
+ assertThat(executorService).isInstanceOf(FunctionExecutionPooledExecutor.class);
+
+ FunctionExecutionPooledExecutor executor = (FunctionExecutionPooledExecutor) executorService;
+
+ assertThat(executor.getCorePoolSize()).isEqualTo(1);
+ assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(30);
+ assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
+ assertThat(executor.getBufferQueue()).isInstanceOf(OverflowQueueWithDMStats.class);
+ assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
+ assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+ OverflowQueueWithDMStats overflowQueueWithDMStats =
+ (OverflowQueueWithDMStats) executor.getBufferQueue();
+
+ assertThat(overflowQueueWithDMStats.getQueueStatHelper()).isSameAs(queueStatHelper);
+
+ ThreadFactory threadFactory = executor.getThreadFactory();
+ Thread thread = threadFactory.newThread(runnable);
+
+ assertThat(thread).isInstanceOf(LoggingThread.class);
+ assertThat(thread.getName()).contains(threadName);
+ }
+}