You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2020/07/22 12:40:14 UTC
[hadoop-ozone] branch master updated: HDDS-3933. Fix memory leak
because of too many Datanode State Machine Thread (#1185)
This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new ff7b5a3 HDDS-3933. Fix memory leak because of too many Datanode State Machine Thread (#1185)
ff7b5a3 is described below
commit ff7b5a3367eccc0969bfd92a2cafe48899a2aaa5
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Jul 22 20:40:04 2020 +0800
HDDS-3933. Fix memory leak because of too many Datanode State Machine Thread (#1185)
---
.../common/statemachine/DatanodeStateMachine.java | 25 +++++-
.../common/statemachine/StateContext.java | 34 +++++++-
.../states/datanode/RunningDatanodeState.java | 14 +++-
.../common/statemachine/TestStateContext.java | 30 ++++++++
.../states/datanode/TestRunningDatanodeState.java | 90 ++++++++++++++++++++++
5 files changed, 184 insertions(+), 9 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 779b60a..27e814b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -19,11 +19,13 @@ package org.apache.hadoop.ozone.container.common.statemachine;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
@@ -50,7 +52,6 @@ import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.HadoopExecutors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -103,9 +104,10 @@ public class DatanodeStateMachine implements Closeable {
this.hddsDatanodeStopService = hddsDatanodeStopService;
this.conf = conf;
this.datanodeDetails = datanodeDetails;
- executorService = HadoopExecutors.newCachedThreadPool(
- new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Datanode State Machine Thread - %d").build());
+ executorService = Executors.newFixedThreadPool(
+ getEndPointTaskThreadPoolSize(),
+ new ThreadFactoryBuilder()
+ .setNameFormat("Datanode State Machine Task Thread - %d").build());
connectionManager = new SCMConnectionManager(conf);
context = new StateContext(this.conf, DatanodeStates.getInitState(), this);
// OzoneContainer instance is used in a non-thread safe way by the context
@@ -155,6 +157,21 @@ public class DatanodeStateMachine implements Closeable {
.build();
}
+ private int getEndPointTaskThreadPoolSize() {
+ // TODO(runzhiwang): current only support one recon, if support multiple
+ // recon in future reconServerCount should be the real number of recon
+ int reconServerCount = 1;
+ int totalServerCount = reconServerCount;
+
+ try {
+ totalServerCount += HddsUtils.getSCMAddresses(conf).size();
+ } catch (Exception e) {
+ LOG.error("Fail to get scm addresses", e);
+ }
+
+ return totalServerCount;
+ }
+
/**
*
* Return DatanodeDetails if set, return null otherwise.
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index f3a599d..51262c3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +36,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
@@ -51,6 +53,8 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
import static java.lang.Math.min;
import org.apache.commons.collections.CollectionUtils;
+
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.getLogWarnInterval;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmHeartbeatInterval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,6 +78,7 @@ public class StateContext {
private DatanodeStateMachine.DatanodeStates state;
private boolean shutdownOnError = false;
private boolean shutdownGracefully = false;
+ private final AtomicLong threadPoolNotAvailableCount;
/**
* Starting with a 2 sec heartbeat frequency which will be updated to the
@@ -103,6 +108,7 @@ public class StateContext {
pipelineActions = new HashMap<>();
lock = new ReentrantLock();
stateExecutionCount = new AtomicLong(0);
+ threadPoolNotAvailableCount = new AtomicLong(0);
}
/**
@@ -393,6 +399,20 @@ public class StateContext {
}
}
+ @VisibleForTesting
+ public boolean isThreadPoolAvailable(ExecutorService executor) {
+ if (!(executor instanceof ThreadPoolExecutor)) {
+ return true;
+ }
+
+ ThreadPoolExecutor ex = (ThreadPoolExecutor) executor;
+ if (ex.getQueue().size() == 0) {
+ return true;
+ }
+
+ return false;
+ }
+
/**
* Executes the required state function.
*
@@ -415,7 +435,19 @@ public class StateContext {
if (this.isEntering()) {
task.onEnter();
}
- task.execute(service);
+
+ if (isThreadPoolAvailable(service)) {
+ task.execute(service);
+ threadPoolNotAvailableCount.set(0);
+ } else {
+ if (threadPoolNotAvailableCount.get()
+ % getLogWarnInterval(conf) == 0) {
+ LOG.warn("No available thread in pool for past {} seconds.",
+ unit.toSeconds(time) * (threadPoolNotAvailableCount.get() + 1));
+ }
+ threadPoolNotAvailableCount.incrementAndGet();
+ }
+
DatanodeStateMachine.DatanodeStates newState = task.await(time, unit);
if (this.state != newState) {
if (LOG.isDebugEnabled()) {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
index 8a9bcaf..b0cfb4c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
@@ -16,6 +16,7 @@
*/
package org.apache.hadoop.ozone.container.common.states.datanode;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
@@ -42,7 +43,6 @@ import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
/**
* Class that implements handshake with SCM.
@@ -152,6 +152,11 @@ public class RunningDatanodeState implements DatanodeState {
}
}
+ @VisibleForTesting
+ public void setExecutorCompletionService(ExecutorCompletionService e) {
+ this.ecs = e;
+ }
+
private Callable<EndPointStates> getEndPointTask(
EndpointStateMachine endpoint) {
if (endpointTasks.containsKey(endpoint)) {
@@ -200,10 +205,11 @@ public class RunningDatanodeState implements DatanodeState {
@Override
public DatanodeStateMachine.DatanodeStates
await(long duration, TimeUnit timeUnit)
- throws InterruptedException, ExecutionException, TimeoutException {
+ throws InterruptedException {
int count = connectionManager.getValues().size();
int returned = 0;
- long timeLeft = timeUnit.toMillis(duration);
+ long durationMS = timeUnit.toMillis(duration);
+ long timeLeft = durationMS;
long startTime = Time.monotonicNow();
List<Future<EndPointStates>> results = new LinkedList<>();
@@ -214,7 +220,7 @@ public class RunningDatanodeState implements DatanodeState {
results.add(result);
returned++;
}
- timeLeft = timeLeft - (Time.monotonicNow() - startTime);
+ timeLeft = durationMS - (Time.monotonicNow() - startTime);
}
return computeNextContainerState(results);
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index 545d670..c3fd310 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -28,6 +28,8 @@ import static org.mockito.Mockito.mock;
import java.net.InetSocketAddress;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -39,6 +41,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.Assert;
import org.junit.Test;
import com.google.protobuf.GeneratedMessage;
@@ -182,4 +186,30 @@ public class TestStateContext {
assertEquals(DatanodeStates.SHUTDOWN, subject.getState());
}
+ @Test
+ public void testIsThreadPoolAvailable() throws Exception {
+ StateContext stateContext = new StateContext(null, null, null);
+
+ int threadPoolSize = 2;
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ threadPoolSize);
+
+ CompletableFuture<String> futureOne = new CompletableFuture<>();
+ CompletableFuture<String> futureTwo = new CompletableFuture<>();
+
+ // task num greater than pool size
+ for (int i = 0; i < threadPoolSize; i++) {
+ executorService.submit(() -> futureOne.get());
+ }
+ executorService.submit(() -> futureTwo.get());
+
+ Assert.assertFalse(stateContext.isThreadPoolAvailable(executorService));
+
+ futureOne.complete("futureOne");
+ LambdaTestUtils.await(1000, 100, () ->
+ stateContext.isThreadPoolAvailable(executorService));
+
+ futureTwo.complete("futureTwo");
+ executorService.shutdown();
+ }
}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java
new file mode 100644
index 0000000..9fb4307
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.datanode;
+
+import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine.EndPointStates.SHUTDOWN;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test class for RunningDatanodeState.
+ */
+public class TestRunningDatanodeState {
+ @Test
+ public void testAwait() throws InterruptedException {
+ SCMConnectionManager connectionManager =
+ Mockito.mock(SCMConnectionManager.class);
+ List<EndpointStateMachine> stateMachines = new ArrayList<>();
+ when(connectionManager.getValues()).thenReturn(stateMachines);
+
+ RunningDatanodeState state =
+ new RunningDatanodeState(null, connectionManager, null);
+
+ int threadPoolSize = 2;
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ threadPoolSize);
+
+ ExecutorCompletionService ecs =
+ new ExecutorCompletionService<>(executorService);
+ state.setExecutorCompletionService(ecs);
+
+ for (int i = 0; i < threadPoolSize; i++) {
+ stateMachines.add(new EndpointStateMachine(null, null, null));
+ }
+
+ CompletableFuture<EndpointStateMachine.EndPointStates> futureOne =
+ new CompletableFuture<>();
+ for (int i = 0; i < threadPoolSize; i++) {
+ ecs.submit(() -> futureOne.get());
+ }
+
+ long startTime = Time.monotonicNow();
+ state.await(500, TimeUnit.MILLISECONDS);
+ long endTime = Time.monotonicNow();
+ Assert.assertTrue((endTime - startTime) >= 500);
+
+ futureOne.complete(SHUTDOWN);
+
+ CompletableFuture<EndpointStateMachine.EndPointStates> futureTwo =
+ new CompletableFuture<>();
+ for (int i = 0; i < threadPoolSize; i++) {
+ ecs.submit(() -> futureTwo.get());
+ }
+ futureTwo.complete(SHUTDOWN);
+
+ startTime = Time.monotonicNow();
+ state.await(500, TimeUnit.MILLISECONDS);
+ endTime = Time.monotonicNow();
+ Assert.assertTrue((endTime - startTime) < 500);
+
+ executorService.shutdown();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org