You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2020/07/22 13:00:48 UTC

[hadoop-ozone] 38/39: HDDS-3933. Fix memory leak because of too many Datanode State Machine Thread (#1185)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch ozone-0.6.0
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 507bba57626fd37c51d50a8cb08784622d37e432
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Jul 22 20:40:04 2020 +0800

    HDDS-3933. Fix memory leak because of too many Datanode State Machine Thread (#1185)
    
    (cherry picked from commit ff7b5a3367eccc0969bfd92a2cafe48899a2aaa5)
---
 .../common/statemachine/DatanodeStateMachine.java  | 25 +++++-
 .../common/statemachine/StateContext.java          | 34 +++++++-
 .../states/datanode/RunningDatanodeState.java      | 14 +++-
 .../common/statemachine/TestStateContext.java      | 30 ++++++++
 .../states/datanode/TestRunningDatanodeState.java  | 90 ++++++++++++++++++++++
 5 files changed, 184 insertions(+), 9 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 779b60a..27e814b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -19,11 +19,13 @@ package org.apache.hadoop.ozone.container.common.statemachine;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
@@ -50,7 +52,6 @@ import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.HadoopExecutors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -103,9 +104,10 @@ public class DatanodeStateMachine implements Closeable {
     this.hddsDatanodeStopService = hddsDatanodeStopService;
     this.conf = conf;
     this.datanodeDetails = datanodeDetails;
-    executorService = HadoopExecutors.newCachedThreadPool(
-                new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("Datanode State Machine Thread - %d").build());
+    executorService = Executors.newFixedThreadPool(
+        getEndPointTaskThreadPoolSize(),
+        new ThreadFactoryBuilder()
+            .setNameFormat("Datanode State Machine Task Thread - %d").build());
     connectionManager = new SCMConnectionManager(conf);
     context = new StateContext(this.conf, DatanodeStates.getInitState(), this);
     // OzoneContainer instance is used in a non-thread safe way by the context
@@ -155,6 +157,21 @@ public class DatanodeStateMachine implements Closeable {
         .build();
   }
 
+  private int getEndPointTaskThreadPoolSize() {
+    // TODO(runzhiwang): current only support one recon, if support multiple
+    //  recon in future reconServerCount should be the real number of recon
+    int reconServerCount = 1;
+    int totalServerCount = reconServerCount;
+
+    try {
+      totalServerCount += HddsUtils.getSCMAddresses(conf).size();
+    } catch (Exception e) {
+      LOG.error("Fail to get scm addresses", e);
+    }
+
+    return totalServerCount;
+  }
+
   /**
    *
    * Return DatanodeDetails if set, return null otherwise.
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index f3a599d..51262c3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +36,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
@@ -51,6 +53,8 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
 import static java.lang.Math.min;
 import org.apache.commons.collections.CollectionUtils;
+
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.getLogWarnInterval;
 import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmHeartbeatInterval;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,6 +78,7 @@ public class StateContext {
   private DatanodeStateMachine.DatanodeStates state;
   private boolean shutdownOnError = false;
   private boolean shutdownGracefully = false;
+  private final AtomicLong threadPoolNotAvailableCount;
 
   /**
    * Starting with a 2 sec heartbeat frequency which will be updated to the
@@ -103,6 +108,7 @@ public class StateContext {
     pipelineActions = new HashMap<>();
     lock = new ReentrantLock();
     stateExecutionCount = new AtomicLong(0);
+    threadPoolNotAvailableCount = new AtomicLong(0);
   }
 
   /**
@@ -393,6 +399,20 @@ public class StateContext {
     }
   }
 
+  @VisibleForTesting
+  public boolean isThreadPoolAvailable(ExecutorService executor) {
+    if (!(executor instanceof ThreadPoolExecutor)) {
+      return true;
+    }
+
+    ThreadPoolExecutor ex = (ThreadPoolExecutor) executor;
+    if (ex.getQueue().size() == 0) {
+      return true;
+    }
+
+    return false;
+  }
+
   /**
    * Executes the required state function.
    *
@@ -415,7 +435,19 @@ public class StateContext {
       if (this.isEntering()) {
         task.onEnter();
       }
-      task.execute(service);
+
+      if (isThreadPoolAvailable(service)) {
+        task.execute(service);
+        threadPoolNotAvailableCount.set(0);
+      } else {
+        if (threadPoolNotAvailableCount.get()
+            % getLogWarnInterval(conf) == 0) {
+          LOG.warn("No available thread in pool for past {} seconds.",
+              unit.toSeconds(time) * (threadPoolNotAvailableCount.get() + 1));
+        }
+        threadPoolNotAvailableCount.incrementAndGet();
+      }
+
       DatanodeStateMachine.DatanodeStates newState = task.await(time, unit);
       if (this.state != newState) {
         if (LOG.isDebugEnabled()) {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
index 8a9bcaf..b0cfb4c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.container.common.states.datanode;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
@@ -42,7 +43,6 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 /**
  * Class that implements handshake with SCM.
@@ -152,6 +152,11 @@ public class RunningDatanodeState implements DatanodeState {
     }
   }
 
+  @VisibleForTesting
+  public void setExecutorCompletionService(ExecutorCompletionService e) {
+    this.ecs = e;
+  }
+
   private Callable<EndPointStates> getEndPointTask(
       EndpointStateMachine endpoint) {
     if (endpointTasks.containsKey(endpoint)) {
@@ -200,10 +205,11 @@ public class RunningDatanodeState implements DatanodeState {
   @Override
   public DatanodeStateMachine.DatanodeStates
       await(long duration, TimeUnit timeUnit)
-      throws InterruptedException, ExecutionException, TimeoutException {
+      throws InterruptedException {
     int count = connectionManager.getValues().size();
     int returned = 0;
-    long timeLeft = timeUnit.toMillis(duration);
+    long durationMS = timeUnit.toMillis(duration);
+    long timeLeft = durationMS;
     long startTime = Time.monotonicNow();
     List<Future<EndPointStates>> results = new LinkedList<>();
 
@@ -214,7 +220,7 @@ public class RunningDatanodeState implements DatanodeState {
         results.add(result);
         returned++;
       }
-      timeLeft = timeLeft - (Time.monotonicNow() - startTime);
+      timeLeft = durationMS - (Time.monotonicNow() - startTime);
     }
     return computeNextContainerState(results);
   }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index 545d670..c3fd310 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -28,6 +28,8 @@ import static org.mockito.Mockito.mock;
 
 import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -39,6 +41,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
 import org.apache.hadoop.ozone.container.common.states.DatanodeState;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.Assert;
 import org.junit.Test;
 
 import com.google.protobuf.GeneratedMessage;
@@ -182,4 +186,30 @@ public class TestStateContext {
     assertEquals(DatanodeStates.SHUTDOWN, subject.getState());
   }
 
+  @Test
+  public void testIsThreadPoolAvailable() throws Exception {
+    StateContext stateContext = new StateContext(null, null, null);
+
+    int threadPoolSize = 2;
+    ExecutorService executorService = Executors.newFixedThreadPool(
+        threadPoolSize);
+
+    CompletableFuture<String> futureOne = new CompletableFuture<>();
+    CompletableFuture<String> futureTwo = new CompletableFuture<>();
+
+    // task num greater than pool size
+    for (int i = 0; i < threadPoolSize; i++) {
+      executorService.submit(() -> futureOne.get());
+    }
+    executorService.submit(() -> futureTwo.get());
+
+    Assert.assertFalse(stateContext.isThreadPoolAvailable(executorService));
+
+    futureOne.complete("futureOne");
+    LambdaTestUtils.await(1000, 100, () ->
+        stateContext.isThreadPoolAvailable(executorService));
+
+    futureTwo.complete("futureTwo");
+    executorService.shutdown();
+  }
 }
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java
new file mode 100644
index 0000000..9fb4307
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/datanode/TestRunningDatanodeState.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.datanode;
+
+import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine.EndPointStates.SHUTDOWN;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test class for RunningDatanodeState.
+ */
+public class TestRunningDatanodeState {
+  @Test
+  public void testAwait() throws InterruptedException {
+    SCMConnectionManager connectionManager =
+        Mockito.mock(SCMConnectionManager.class);
+    List<EndpointStateMachine> stateMachines = new ArrayList<>();
+    when(connectionManager.getValues()).thenReturn(stateMachines);
+
+    RunningDatanodeState state =
+        new RunningDatanodeState(null, connectionManager, null);
+
+    int threadPoolSize = 2;
+    ExecutorService executorService = Executors.newFixedThreadPool(
+        threadPoolSize);
+
+    ExecutorCompletionService ecs =
+        new ExecutorCompletionService<>(executorService);
+    state.setExecutorCompletionService(ecs);
+
+    for (int i = 0; i < threadPoolSize; i++) {
+      stateMachines.add(new EndpointStateMachine(null, null, null));
+    }
+
+    CompletableFuture<EndpointStateMachine.EndPointStates> futureOne =
+        new CompletableFuture<>();
+    for (int i = 0; i < threadPoolSize; i++) {
+      ecs.submit(() -> futureOne.get());
+    }
+
+    long startTime = Time.monotonicNow();
+    state.await(500, TimeUnit.MILLISECONDS);
+    long endTime = Time.monotonicNow();
+    Assert.assertTrue((endTime - startTime) >= 500);
+
+    futureOne.complete(SHUTDOWN);
+
+    CompletableFuture<EndpointStateMachine.EndPointStates> futureTwo =
+        new CompletableFuture<>();
+    for (int i = 0; i < threadPoolSize; i++) {
+      ecs.submit(() -> futureTwo.get());
+    }
+    futureTwo.complete(SHUTDOWN);
+
+    startTime = Time.monotonicNow();
+    state.await(500, TimeUnit.MILLISECONDS);
+    endTime = Time.monotonicNow();
+    Assert.assertTrue((endTime - startTime) < 500);
+
+    executorService.shutdown();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org