You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2020/10/14 15:29:22 UTC

[tez] branch branch-0.9 updated: TEZ-4070: SSLFactory not closed in DAGClientTimelineImpl caused native memory issues (László Bodor reviewed by Jonathan Turner Eagles)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 25c470a  TEZ-4070: SSLFactory not closed in DAGClientTimelineImpl caused native memory issues (László Bodor reviewed by Jonathan Turner Eagles)
25c470a is described below

commit 25c470a1d1518f1df6f0bc461689ef35fe1b1a6f
Author: László Bodor <bo...@gmail.com>
AuthorDate: Wed Oct 14 17:24:57 2020 +0200

    TEZ-4070: SSLFactory not closed in DAGClientTimelineImpl caused native memory issues (László Bodor reviewed by Jonathan Turner Eagles)
    
    Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
 tez-api/pom.xml                                    |  5 +++
 .../tez/dag/api/client/DAGClientTimelineImpl.java  |  3 ++
 .../tez/dag/api/client/TimelineReaderFactory.java  | 36 +++++++++++----
 .../tez/dag/api/client/rpc/TestDAGClient.java      | 52 ++++++++++++++++++++++
 4 files changed, 88 insertions(+), 8 deletions(-)

diff --git a/tez-api/pom.xml b/tez-api/pom.xml
index 13b6c01..6b84a88 100644
--- a/tez-api/pom.xml
+++ b/tez-api/pom.xml
@@ -110,6 +110,11 @@
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.bouncycastle</groupId>
+      <artifactId>bcprov-jdk15on</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
index d34dbf0..17d2386 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
@@ -213,6 +213,9 @@ public class DAGClientTimelineImpl extends DAGClientInternal {
       httpClient.destroy();
       httpClient = null;
     }
+    if (timelineReaderStrategy != null) {
+      timelineReaderStrategy.close();
+    }
   }
 
   private DAGStatusProto.Builder parseDagStatus(JSONObject jsonRoot, Set<StatusGetOpts> statusOptions)
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
index c0569dd..40340cc 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
@@ -133,6 +133,7 @@ public class TimelineReaderFactory {
 
   public interface TimelineReaderStrategy {
     Client getHttpClient() throws IOException;
+    void close();
   }
 
   /*
@@ -142,6 +143,8 @@ public class TimelineReaderFactory {
     private final Configuration conf;
     private final boolean useHttps;
     private final int connTimeout;
+    private ConnectionConfigurator connectionConfigurator;
+    private SSLFactory sslFactory;
 
     public TimelineReaderTokenAuthenticatedStrategy(final Configuration conf,
                                                     final boolean useHttps,
@@ -150,6 +153,7 @@ public class TimelineReaderFactory {
       this.conf = conf;
       this.useHttps = useHttps;
       this.connTimeout = connTimeout;
+      this.sslFactory = useHttps ? new SSLFactory(CLIENT, conf) : null;
     }
 
     @Override
@@ -160,8 +164,8 @@ public class TimelineReaderFactory {
       UserGroupInformation authUgi;
       String doAsUser;
       ClientConfig clientConfig = new DefaultClientConfig(JSONRootElementProvider.App.class);
-      ConnectionConfigurator connectionConfigurator = getNewConnectionConf(conf, useHttps,
-          connTimeout);
+      connectionConfigurator = getNewConnectionConf(conf, useHttps,
+          connTimeout, sslFactory);
 
       try {
         authenticator = getTokenAuthenticator();
@@ -238,6 +242,13 @@ public class TimelineReaderFactory {
         }
       }
     }
+
+    @Override
+    public void close() {
+      if (sslFactory != null) {
+        sslFactory.destroy();
+      }
+    }
   }
 
   /*
@@ -247,11 +258,13 @@ public class TimelineReaderFactory {
   protected static class TimelineReaderPseudoAuthenticatedStrategy implements TimelineReaderStrategy {
 
     private final ConnectionConfigurator connectionConf;
+    private final SSLFactory sslFactory;
 
     public TimelineReaderPseudoAuthenticatedStrategy(final Configuration conf,
                                                      final boolean useHttps,
                                                      final int connTimeout) {
-      connectionConf = getNewConnectionConf(conf, useHttps, connTimeout);
+      sslFactory = useHttps ? new SSLFactory(CLIENT, conf) : null;
+      connectionConf = getNewConnectionConf(conf, useHttps, connTimeout, sslFactory);
     }
 
     @Override
@@ -282,15 +295,23 @@ public class TimelineReaderFactory {
         return httpURLConnection;
       }
     }
+
+    @Override
+    public void close() {
+      if (sslFactory != null) {
+        sslFactory.destroy();
+      }
+    }
   }
 
   private static ConnectionConfigurator getNewConnectionConf(final Configuration conf,
                                                              final boolean useHttps,
-                                                             final int connTimeout) {
+                                                             final int connTimeout,
+                                                             final SSLFactory sslFactory) {
     ConnectionConfigurator connectionConf = null;
     if (useHttps) {
       try {
-        connectionConf = getNewSSLConnectionConf(conf, connTimeout);
+        connectionConf = getNewSSLConnectionConf(conf, connTimeout, sslFactory);
       } catch (IOException e) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Cannot load customized ssl related configuration."
@@ -313,13 +334,12 @@ public class TimelineReaderFactory {
   }
 
   private static ConnectionConfigurator getNewSSLConnectionConf(final Configuration conf,
-                                                                final int connTimeout)
+                                                                final int connTimeout,
+                                                                final SSLFactory sslFactory)
       throws IOException {
-    final SSLFactory sslFactory;
     final SSLSocketFactory sslSocketFactory;
     final HostnameVerifier hostnameVerifier;
 
-    sslFactory = new SSLFactory(CLIENT, conf);
     try {
       sslFactory.init();
       sslSocketFactory = sslFactory.createSSLSocketFactory();
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
index 70ee1d4..c323fb0 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
@@ -28,10 +28,14 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import javax.annotation.Nullable;
+
+import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.EnumSet;
 import java.util.Set;
 
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -46,6 +50,7 @@ import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DagStatusSource;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.api.client.TimelineReaderFactory.TimelineReaderStrategy;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusResponseProto;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
@@ -62,6 +67,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezCounterProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezCountersProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexStatusProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexStatusStateProto;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -531,4 +537,50 @@ public class TestDAGClient {
     }).when(mock).getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class));
     return mock;
   }
+
+  @Test
+  /* testing idea is borrowed from YARN-5309 */
+  public void testTimelineClientCleanup() throws Exception {
+    TezConfiguration tezConf = new TezConfiguration();
+    tezConf.set("yarn.http.policy", "HTTPS_ONLY");
+
+    File testDir = new File(System.getProperty("java.io.tmpdir"));
+    String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestDAGClient.class);
+    KeyStoreTestUtil.setupSSLConfig(testDir.getAbsolutePath(), sslConfDir, tezConf, false);
+
+    DAGClientTimelineImpl dagClient =
+        new DAGClientTimelineImpl(mockAppId, dagIdStr, tezConf, mock(FrameworkClient.class), 10000);
+    Field field = DAGClientTimelineImpl.class.getDeclaredField("timelineReaderStrategy");
+    field.setAccessible(true);
+    TimelineReaderStrategy strategy = (TimelineReaderStrategy) field.get(dagClient);
+    strategy.getHttpClient(); // calls SSLFactory.init
+
+    ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
+
+    while (threadGroup.getParent() != null) {
+      threadGroup = threadGroup.getParent();
+    }
+
+    Thread[] threads = new Thread[threadGroup.activeCount()];
+
+    threadGroup.enumerate(threads);
+    Thread reloaderThread = null;
+    for (Thread thread : threads) {
+      if ((thread.getName() != null) && (thread.getName().contains("Truststore reloader thread"))) {
+        reloaderThread = thread;
+      }
+    }
+    Assert.assertTrue("Reloader is not alive", reloaderThread.isAlive());
+
+    dagClient.close();
+    boolean reloaderStillAlive = true;
+    for (int i = 0; i < 10; i++) {
+      reloaderStillAlive = reloaderThread.isAlive();
+      if (!reloaderStillAlive) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    Assert.assertFalse("Reloader is still alive", reloaderStillAlive);
+  }
 }