You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by hi...@apache.org on 2020/04/05 22:02:02 UTC

[druid] branch 0.18.0 updated: Fix double count ssl connection metrics (#9594) (#9615)

This is an automated email from the ASF dual-hosted git repository.

himanshug pushed a commit to branch 0.18.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.18.0 by this push:
     new 404a558  Fix double count ssl connection metrics (#9594) (#9615)
404a558 is described below

commit 404a5586e44b18784d48204ff3372e561a36589c
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Sun Apr 5 15:01:43 2020 -0700

    Fix double count ssl connection metrics (#9594) (#9615)
    
    * fix double counted jetty/numOpenConnections metric for ssl connections
    
    * tests
    
    * more better
    
    * style
---
 examples/bin/dsql-main                             |   2 +-
 .../docker/tls/generate-good-client-cert.sh        |   2 +-
 .../tls/generate-server-certs-and-keystores.sh     |   2 +-
 .../initialization/jetty/JettyServerModule.java    |  15 +-
 .../druid/server/initialization/BaseJettyTest.java |  70 +++++++
 .../druid/server/initialization/JettyTest.java     | 211 ++++++++++++++++++++-
 server/src/test/resources/server.jks               | Bin 0 -> 1911 bytes
 server/src/test/resources/truststore.jks           | Bin 0 -> 1641 bytes
 8 files changed, 293 insertions(+), 9 deletions(-)

diff --git a/examples/bin/dsql-main b/examples/bin/dsql-main
index 8dfe882..cf68581 100755
--- a/examples/bin/dsql-main
+++ b/examples/bin/dsql-main
@@ -400,7 +400,7 @@ def main():
   parser_fmt.add_argument('--format', type=str, default='table', choices=('csv', 'tsv', 'json', 'table'), help='Result format')
   parser_fmt.add_argument('--header', action='store_true', help='Include header row for formats "csv" and "tsv"')
   parser_fmt.add_argument('--tsv-delimiter', type=str, default='\t', help='Delimiter for format "tsv"')
-  parser_oth.add_argument('--context-option', '-c', type=str, action='append', help='Set context option for this connection, see https://docs.imply.io/on-prem/query-data/sql for options')
+  parser_oth.add_argument('--context-option', '-c', type=str, action='append', help='Set context option for this connection, see https://druid.apache.org/docs/latest/querying/sql.html#connection-context for options')
   parser_oth.add_argument('--execute', '-e', type=str, help='Execute single SQL query')
   args = parser.parse_args()
 
diff --git a/integration-tests/docker/tls/generate-good-client-cert.sh b/integration-tests/docker/tls/generate-good-client-cert.sh
index 895e6c3..0f16c14 100755
--- a/integration-tests/docker/tls/generate-good-client-cert.sh
+++ b/integration-tests/docker/tls/generate-good-client-cert.sh
@@ -58,5 +58,5 @@ openssl x509 -req -days 3650 -in client.csr -CA root.pem -CAkey root.key -set_se
 openssl pkcs12 -export -in client.pem -inkey client.key -out client.p12 -name druid -CAfile root.pem -caname druid-it-root -password pass:druid123
 keytool -importkeystore -srckeystore client.p12 -srcstoretype PKCS12 -destkeystore client.jks -deststoretype JKS -srcstorepass druid123 -deststorepass druid123
 
-# Create a Java truststore with the imply test cluster root CA
+# Create a Java truststore with the druid test cluster root CA
 keytool -import -alias druid-it-root -keystore truststore.jks -file root.pem -storepass druid123 -noprompt
diff --git a/integration-tests/docker/tls/generate-server-certs-and-keystores.sh b/integration-tests/docker/tls/generate-server-certs-and-keystores.sh
index 8f38be3..e26cdac 100755
--- a/integration-tests/docker/tls/generate-server-certs-and-keystores.sh
+++ b/integration-tests/docker/tls/generate-server-certs-and-keystores.sh
@@ -63,7 +63,7 @@ openssl x509 -req -days 3650 -in server.csr -CA root.pem -CAkey root.key -set_se
 openssl pkcs12 -export -in server.pem -inkey server.key -out server.p12 -name druid -CAfile root.pem -caname druid-it-root -password pass:druid123
 keytool -importkeystore -srckeystore server.p12 -srcstoretype PKCS12 -destkeystore server.jks -deststoretype JKS -srcstorepass druid123 -deststorepass druid123
 
-# Create a Java truststore with the imply test cluster root CA
+# Create a Java truststore with the druid test cluster root CA
 keytool -import -alias druid-it-root -keystore truststore.jks -file root.pem -storepass druid123 -noprompt
 
 # Revoke one of the client certs
diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
index bbb80e4..fc6f93e 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.initialization.jetty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 import com.fasterxml.jackson.jaxrs.smile.JacksonSmileProvider;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Ints;
 import com.google.inject.Binder;
@@ -345,7 +346,13 @@ public class JettyServerModule extends JerseyServletModule
 
       List<ConnectionFactory> monitoredConnFactories = new ArrayList<>();
       for (ConnectionFactory cf : connector.getConnectionFactories()) {
-        monitoredConnFactories.add(new JettyMonitoringConnectionFactory(cf, ACTIVE_CONNECTIONS));
+        // we only want to monitor the first connection factory, since it will pass the connection to subsequent
+        // connection factories (in this case HTTP/1.1 after the connection is unencrypted for SSL)
+        if (cf.getProtocol().equals(connector.getDefaultProtocol())) {
+          monitoredConnFactories.add(new JettyMonitoringConnectionFactory(cf, ACTIVE_CONNECTIONS));
+        } else {
+          monitoredConnFactories.add(cf);
+        }
       }
       connector.setConnectionFactories(monitoredConnFactories);
     }
@@ -531,4 +538,10 @@ public class JettyServerModule extends JerseyServletModule
       return newTrustManagers;
     }
   }
+
+  @VisibleForTesting
+  public int getActiveConnections()
+  {
+    return ACTIVE_CONNECTIONS.get();
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java b/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java
index defe2de..e38865a 100644
--- a/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java
+++ b/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.server.initialization;
 
+import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.servlet.GuiceFilter;
@@ -60,6 +61,7 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.zip.Deflater;
@@ -72,6 +74,7 @@ public abstract class BaseJettyTest
   protected HttpClient client;
   protected Server server;
   protected int port = -1;
+  protected int tlsPort = -1;
 
   public static void setProperties()
   {
@@ -87,6 +90,8 @@ public abstract class BaseJettyTest
     Injector injector = setupInjector();
     final DruidNode node = injector.getInstance(Key.get(DruidNode.class, Self.class));
     port = node.getPlaintextPort();
+    tlsPort = node.getTlsPort();
+
     lifecycle = injector.getInstance(Lifecycle.class);
     lifecycle.start();
     ClientHolder holder = injector.getInstance(ClientHolder.class);
@@ -175,6 +180,71 @@ public abstract class BaseJettyTest
     }
   }
 
+  @Path("/latched")
+  public static class LatchedResource
+  {
+    private final LatchedRequestStateHolder state;
+
+    @Inject
+    public LatchedResource(LatchedRequestStateHolder state)
+    {
+      this.state = state;
+    }
+
+    @GET
+    @Path("/hello")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response hello()
+    {
+      state.serverStartRequest();
+      try {
+        state.serverWaitForClientReadyToFinishRequest();
+      }
+      catch (InterruptedException ignored) {
+      }
+      return Response.ok(DEFAULT_RESPONSE_CONTENT).build();
+    }
+  }
+
+  public static class LatchedRequestStateHolder
+  {
+    private static final int TIMEOUT_MILLIS = 10_000;
+
+    private CountDownLatch requestStartLatch;
+    private CountDownLatch requestEndLatch;
+
+    public LatchedRequestStateHolder()
+    {
+      reset();
+    }
+
+    public void reset()
+    {
+      requestStartLatch = new CountDownLatch(1);
+      requestEndLatch = new CountDownLatch(1);
+    }
+
+    public void clientWaitForServerToStartRequest() throws InterruptedException
+    {
+      requestStartLatch.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+    }
+
+    public void serverStartRequest()
+    {
+      requestStartLatch.countDown();
+    }
+
+    public void serverWaitForClientReadyToFinishRequest() throws InterruptedException
+    {
+      requestEndLatch.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+    }
+
+    public void clientReadyToFinishRequest()
+    {
+      requestEndLatch.countDown();
+    }
+  }
+
   @Path("/default")
   public static class DefaultResource
   {
diff --git a/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java b/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java
index 5760805..64ccd14 100644
--- a/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java
+++ b/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java
@@ -34,26 +34,35 @@ import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.initialization.Initialization;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.HttpClientConfig;
+import org.apache.druid.java.util.http.client.HttpClientInit;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
 import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
 import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.metadata.PasswordProvider;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
+import org.apache.druid.server.initialization.jetty.JettyServerModule;
 import org.apache.druid.server.initialization.jetty.ServletFilterHolder;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.eclipse.jetty.server.Server;
 import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import javax.servlet.DispatcherType;
 import javax.servlet.Filter;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.MediaType;
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
@@ -61,6 +70,8 @@ import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.EnumSet;
 import java.util.Locale;
 import java.util.Map;
@@ -74,10 +85,109 @@ import java.util.zip.GZIPOutputStream;
 
 public class JettyTest extends BaseJettyTest
 {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private HttpClientConfig sslConfig;
+
+  private Injector injector;
+
+  private LatchedRequestStateHolder latchedRequestState;
+
   @Override
   protected Injector setupInjector()
   {
-    return Initialization.makeInjectorWithModules(
+    TLSServerConfig tlsConfig;
+    try {
+      File keyStore = new File(JettyTest.class.getClassLoader().getResource("server.jks").getFile());
+      Path tmpKeyStore = Files.copy(keyStore.toPath(), new File(folder.newFolder(), "server.jks").toPath());
+      File trustStore = new File(JettyTest.class.getClassLoader().getResource("truststore.jks").getFile());
+      Path tmpTrustStore = Files.copy(trustStore.toPath(), new File(folder.newFolder(), "truststore.jks").toPath());
+      PasswordProvider pp = () -> "druid123";
+      tlsConfig = new TLSServerConfig()
+      {
+        @Override
+        public String getKeyStorePath()
+        {
+          return tmpKeyStore.toString();
+        }
+
+        @Override
+        public String getKeyStoreType()
+        {
+          return "jks";
+        }
+
+        @Override
+        public PasswordProvider getKeyStorePasswordProvider()
+        {
+          return pp;
+        }
+
+        @Override
+        public PasswordProvider getKeyManagerPasswordProvider()
+        {
+          return pp;
+        }
+
+        @Override
+        public String getTrustStorePath()
+        {
+          return tmpTrustStore.toString();
+        }
+
+        @Override
+        public String getTrustStoreAlgorithm()
+        {
+          return "PKIX";
+        }
+
+        @Override
+        public PasswordProvider getTrustStorePasswordProvider()
+        {
+          return pp;
+        }
+
+        @Override
+        public String getCertAlias()
+        {
+          return "druid";
+        }
+
+        @Override
+        public boolean isRequireClientCertificate()
+        {
+          return false;
+        }
+
+        @Override
+        public boolean isRequestClientCertificate()
+        {
+          return false;
+        }
+
+        @Override
+        public boolean isValidateHostnames()
+        {
+          return false;
+        }
+      };
+
+      sslConfig =
+          HttpClientConfig.builder()
+                          .withSslContext(
+                              HttpClientInit.sslContextWithTrustedKeyStore(tmpTrustStore.toString(), pp.getPassword())
+                          )
+                          .withWorkerCount(1)
+                          .withReadTimeout(Duration.ZERO)
+                          .build();
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    latchedRequestState = new LatchedRequestStateHolder();
+    injector = Initialization.makeInjectorWithModules(
         GuiceInjectors.makeStartupInjector(),
         ImmutableList.<Module>of(
             new Module()
@@ -88,9 +198,11 @@ public class JettyTest extends BaseJettyTest
                 JsonConfigProvider.bindInstance(
                     binder,
                     Key.get(DruidNode.class, Self.class),
-                    new DruidNode("test", "localhost", false, null, null, true, false)
+                    new DruidNode("test", "localhost", false, 9988, 9999, true, true)
                 );
+                binder.bind(TLSServerConfig.class).toInstance(tlsConfig);
                 binder.bind(JettyServerInitializer.class).to(JettyServerInit.class).in(LazySingleton.class);
+                binder.bind(LatchedRequestStateHolder.class).toInstance(latchedRequestState);
 
                 Multibinder<ServletFilterHolder> multibinder = Multibinder.newSetBinder(
                     binder,
@@ -132,7 +244,9 @@ public class JettyTest extends BaseJettyTest
                     }
                 );
 
+
                 Jerseys.addResource(binder, SlowResource.class);
+                Jerseys.addResource(binder, LatchedResource.class);
                 Jerseys.addResource(binder, ExceptionResource.class);
                 Jerseys.addResource(binder, DefaultResource.class);
                 Jerseys.addResource(binder, DirectlyReturnResource.class);
@@ -143,6 +257,7 @@ public class JettyTest extends BaseJettyTest
         )
     );
 
+    return injector;
   }
 
   @Test
@@ -209,13 +324,19 @@ public class JettyTest extends BaseJettyTest
     final HttpURLConnection get = (HttpURLConnection) url.openConnection();
     get.setRequestProperty("Accept-Encoding", "gzip");
     Assert.assertEquals("gzip", get.getContentEncoding());
-    Assert.assertEquals(DEFAULT_RESPONSE_CONTENT, IOUtils.toString(new GZIPInputStream(get.getInputStream()), StandardCharsets.UTF_8));
+    Assert.assertEquals(
+        DEFAULT_RESPONSE_CONTENT,
+        IOUtils.toString(new GZIPInputStream(get.getInputStream()), StandardCharsets.UTF_8)
+    );
 
     final HttpURLConnection post = (HttpURLConnection) url.openConnection();
     post.setRequestProperty("Accept-Encoding", "gzip");
     post.setRequestMethod("POST");
     Assert.assertEquals("gzip", post.getContentEncoding());
-    Assert.assertEquals(DEFAULT_RESPONSE_CONTENT, IOUtils.toString(new GZIPInputStream(post.getInputStream()), StandardCharsets.UTF_8));
+    Assert.assertEquals(
+        DEFAULT_RESPONSE_CONTENT,
+        IOUtils.toString(new GZIPInputStream(post.getInputStream()), StandardCharsets.UTF_8)
+    );
 
     final HttpURLConnection getNoGzip = (HttpURLConnection) url.openConnection();
     Assert.assertNotEquals("gzip", getNoGzip.getContentEncoding());
@@ -224,7 +345,10 @@ public class JettyTest extends BaseJettyTest
     final HttpURLConnection postNoGzip = (HttpURLConnection) url.openConnection();
     postNoGzip.setRequestMethod("POST");
     Assert.assertNotEquals("gzip", postNoGzip.getContentEncoding());
-    Assert.assertEquals(DEFAULT_RESPONSE_CONTENT, IOUtils.toString(postNoGzip.getInputStream(), StandardCharsets.UTF_8));
+    Assert.assertEquals(
+        DEFAULT_RESPONSE_CONTENT,
+        IOUtils.toString(postNoGzip.getInputStream(), StandardCharsets.UTF_8)
+    );
   }
 
   // Tests that threads are not stuck when partial chunk is not finalized
@@ -311,4 +435,81 @@ public class JettyTest extends BaseJettyTest
         new InputStreamResponseHandler()
     ).get()), Charset.defaultCharset()));
   }
+
+  @Test
+  public void testNumConnectionsMetricHttp() throws Exception
+  {
+    String text = "hello";
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(out)) {
+      gzipOutputStream.write(text.getBytes(Charset.defaultCharset()));
+    }
+    Request request = new Request(HttpMethod.GET, new URL("http://localhost:" + port + "/latched/hello"));
+    request.setHeader("Content-Encoding", "gzip");
+    request.setContent(MediaType.TEXT_PLAIN, out.toByteArray());
+
+    JettyServerModule jsm = injector.getInstance(JettyServerModule.class);
+    latchedRequestState.reset();
+
+    Assert.assertEquals(0, jsm.getActiveConnections());
+    ListenableFuture<InputStream> go = client.go(
+        request,
+        new InputStreamResponseHandler()
+    );
+    latchedRequestState.clientWaitForServerToStartRequest();
+    Assert.assertEquals(1, jsm.getActiveConnections());
+    latchedRequestState.clientReadyToFinishRequest();
+    go.get();
+    waitForJettyServerModuleActiveConnectionsZero(jsm);
+    Assert.assertEquals(0, jsm.getActiveConnections());
+  }
+
+  @Test
+  public void testNumConnectionsMetricHttps() throws Exception
+  {
+    String text = "hello";
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(out)) {
+      gzipOutputStream.write(text.getBytes(Charset.defaultCharset()));
+    }
+    Request request = new Request(HttpMethod.GET, new URL("https://localhost:" + tlsPort + "/latched/hello"));
+    request.setHeader("Content-Encoding", "gzip");
+    request.setContent(MediaType.TEXT_PLAIN, out.toByteArray());
+    HttpClient client;
+    try {
+      client = HttpClientInit.createClient(
+          sslConfig,
+          lifecycle
+      );
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    JettyServerModule jsm = injector.getInstance(JettyServerModule.class);
+    latchedRequestState.reset();
+
+    Assert.assertEquals(0, jsm.getActiveConnections());
+    ListenableFuture<InputStream> go = client.go(
+        request,
+        new InputStreamResponseHandler()
+    );
+    latchedRequestState.clientWaitForServerToStartRequest();
+    Assert.assertEquals(1, jsm.getActiveConnections());
+    latchedRequestState.clientReadyToFinishRequest();
+    go.get();
+    waitForJettyServerModuleActiveConnectionsZero(jsm);
+    Assert.assertEquals(0, jsm.getActiveConnections());
+  }
+
+  private void waitForJettyServerModuleActiveConnectionsZero(JettyServerModule jsm) throws InterruptedException
+  {
+    // it can take a bit to close the connection, so maybe sleep for a while and hope it closes
+    final int sleepTimeMills = 10;
+    final int totalSleeps = 5_000 / sleepTimeMills;
+    int count = 0;
+    while (jsm.getActiveConnections() > 0 && count++ < totalSleeps) {
+      Thread.sleep(sleepTimeMills);
+    }
+  }
 }
diff --git a/server/src/test/resources/server.jks b/server/src/test/resources/server.jks
new file mode 100644
index 0000000..664e375
Binary files /dev/null and b/server/src/test/resources/server.jks differ
diff --git a/server/src/test/resources/truststore.jks b/server/src/test/resources/truststore.jks
new file mode 100644
index 0000000..3cd268c
Binary files /dev/null and b/server/src/test/resources/truststore.jks differ


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org