You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by hi...@apache.org on 2020/04/05 22:02:02 UTC
[druid] branch 0.18.0 updated: Fix double count ssl connection
metrics (#9594) (#9615)
This is an automated email from the ASF dual-hosted git repository.
himanshug pushed a commit to branch 0.18.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.18.0 by this push:
new 404a558 Fix double count ssl connection metrics (#9594) (#9615)
404a558 is described below
commit 404a5586e44b18784d48204ff3372e561a36589c
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Sun Apr 5 15:01:43 2020 -0700
Fix double count ssl connection metrics (#9594) (#9615)
* fix double counted jetty/numOpenConnections metric for ssl connections
* tests
* more better
* style
---
examples/bin/dsql-main | 2 +-
.../docker/tls/generate-good-client-cert.sh | 2 +-
.../tls/generate-server-certs-and-keystores.sh | 2 +-
.../initialization/jetty/JettyServerModule.java | 15 +-
.../druid/server/initialization/BaseJettyTest.java | 70 +++++++
.../druid/server/initialization/JettyTest.java | 211 ++++++++++++++++++++-
server/src/test/resources/server.jks | Bin 0 -> 1911 bytes
server/src/test/resources/truststore.jks | Bin 0 -> 1641 bytes
8 files changed, 293 insertions(+), 9 deletions(-)
diff --git a/examples/bin/dsql-main b/examples/bin/dsql-main
index 8dfe882..cf68581 100755
--- a/examples/bin/dsql-main
+++ b/examples/bin/dsql-main
@@ -400,7 +400,7 @@ def main():
parser_fmt.add_argument('--format', type=str, default='table', choices=('csv', 'tsv', 'json', 'table'), help='Result format')
parser_fmt.add_argument('--header', action='store_true', help='Include header row for formats "csv" and "tsv"')
parser_fmt.add_argument('--tsv-delimiter', type=str, default='\t', help='Delimiter for format "tsv"')
- parser_oth.add_argument('--context-option', '-c', type=str, action='append', help='Set context option for this connection, see https://docs.imply.io/on-prem/query-data/sql for options')
+ parser_oth.add_argument('--context-option', '-c', type=str, action='append', help='Set context option for this connection, see https://druid.apache.org/docs/latest/querying/sql.html#connection-context for options')
parser_oth.add_argument('--execute', '-e', type=str, help='Execute single SQL query')
args = parser.parse_args()
diff --git a/integration-tests/docker/tls/generate-good-client-cert.sh b/integration-tests/docker/tls/generate-good-client-cert.sh
index 895e6c3..0f16c14 100755
--- a/integration-tests/docker/tls/generate-good-client-cert.sh
+++ b/integration-tests/docker/tls/generate-good-client-cert.sh
@@ -58,5 +58,5 @@ openssl x509 -req -days 3650 -in client.csr -CA root.pem -CAkey root.key -set_se
openssl pkcs12 -export -in client.pem -inkey client.key -out client.p12 -name druid -CAfile root.pem -caname druid-it-root -password pass:druid123
keytool -importkeystore -srckeystore client.p12 -srcstoretype PKCS12 -destkeystore client.jks -deststoretype JKS -srcstorepass druid123 -deststorepass druid123
-# Create a Java truststore with the imply test cluster root CA
+# Create a Java truststore with the druid test cluster root CA
keytool -import -alias druid-it-root -keystore truststore.jks -file root.pem -storepass druid123 -noprompt
diff --git a/integration-tests/docker/tls/generate-server-certs-and-keystores.sh b/integration-tests/docker/tls/generate-server-certs-and-keystores.sh
index 8f38be3..e26cdac 100755
--- a/integration-tests/docker/tls/generate-server-certs-and-keystores.sh
+++ b/integration-tests/docker/tls/generate-server-certs-and-keystores.sh
@@ -63,7 +63,7 @@ openssl x509 -req -days 3650 -in server.csr -CA root.pem -CAkey root.key -set_se
openssl pkcs12 -export -in server.pem -inkey server.key -out server.p12 -name druid -CAfile root.pem -caname druid-it-root -password pass:druid123
keytool -importkeystore -srckeystore server.p12 -srcstoretype PKCS12 -destkeystore server.jks -deststoretype JKS -srcstorepass druid123 -deststorepass druid123
-# Create a Java truststore with the imply test cluster root CA
+# Create a Java truststore with the druid test cluster root CA
keytool -import -alias druid-it-root -keystore truststore.jks -file root.pem -storepass druid123 -noprompt
# Revoke one of the client certs
diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
index bbb80e4..fc6f93e 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.initialization.jetty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.fasterxml.jackson.jaxrs.smile.JacksonSmileProvider;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.google.inject.Binder;
@@ -345,7 +346,13 @@ public class JettyServerModule extends JerseyServletModule
List<ConnectionFactory> monitoredConnFactories = new ArrayList<>();
for (ConnectionFactory cf : connector.getConnectionFactories()) {
- monitoredConnFactories.add(new JettyMonitoringConnectionFactory(cf, ACTIVE_CONNECTIONS));
+ // we only want to monitor the first connection factory, since it will pass the connection to subsequent
+ // connection factories (in this case HTTP/1.1 after the connection is unencrypted for SSL)
+ if (cf.getProtocol().equals(connector.getDefaultProtocol())) {
+ monitoredConnFactories.add(new JettyMonitoringConnectionFactory(cf, ACTIVE_CONNECTIONS));
+ } else {
+ monitoredConnFactories.add(cf);
+ }
}
connector.setConnectionFactories(monitoredConnFactories);
}
@@ -531,4 +538,10 @@ public class JettyServerModule extends JerseyServletModule
return newTrustManagers;
}
}
+
+ @VisibleForTesting
+ public int getActiveConnections()
+ {
+ return ACTIVE_CONNECTIONS.get();
+ }
}
diff --git a/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java b/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java
index defe2de..e38865a 100644
--- a/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java
+++ b/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.server.initialization;
+import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.servlet.GuiceFilter;
@@ -60,6 +61,7 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.zip.Deflater;
@@ -72,6 +74,7 @@ public abstract class BaseJettyTest
protected HttpClient client;
protected Server server;
protected int port = -1;
+ protected int tlsPort = -1;
public static void setProperties()
{
@@ -87,6 +90,8 @@ public abstract class BaseJettyTest
Injector injector = setupInjector();
final DruidNode node = injector.getInstance(Key.get(DruidNode.class, Self.class));
port = node.getPlaintextPort();
+ tlsPort = node.getTlsPort();
+
lifecycle = injector.getInstance(Lifecycle.class);
lifecycle.start();
ClientHolder holder = injector.getInstance(ClientHolder.class);
@@ -175,6 +180,71 @@ public abstract class BaseJettyTest
}
}
+ @Path("/latched")
+ public static class LatchedResource
+ {
+ private final LatchedRequestStateHolder state;
+
+ @Inject
+ public LatchedResource(LatchedRequestStateHolder state)
+ {
+ this.state = state;
+ }
+
+ @GET
+ @Path("/hello")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response hello()
+ {
+ state.serverStartRequest();
+ try {
+ state.serverWaitForClientReadyToFinishRequest();
+ }
+ catch (InterruptedException ignored) {
+ }
+ return Response.ok(DEFAULT_RESPONSE_CONTENT).build();
+ }
+ }
+
+ public static class LatchedRequestStateHolder
+ {
+ private static final int TIMEOUT_MILLIS = 10_000;
+
+ private CountDownLatch requestStartLatch;
+ private CountDownLatch requestEndLatch;
+
+ public LatchedRequestStateHolder()
+ {
+ reset();
+ }
+
+ public void reset()
+ {
+ requestStartLatch = new CountDownLatch(1);
+ requestEndLatch = new CountDownLatch(1);
+ }
+
+ public void clientWaitForServerToStartRequest() throws InterruptedException
+ {
+ requestStartLatch.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ }
+
+ public void serverStartRequest()
+ {
+ requestStartLatch.countDown();
+ }
+
+ public void serverWaitForClientReadyToFinishRequest() throws InterruptedException
+ {
+ requestEndLatch.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ }
+
+ public void clientReadyToFinishRequest()
+ {
+ requestEndLatch.countDown();
+ }
+ }
+
@Path("/default")
public static class DefaultResource
{
diff --git a/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java b/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java
index 5760805..64ccd14 100644
--- a/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java
+++ b/server/src/test/java/org/apache/druid/server/initialization/JettyTest.java
@@ -34,26 +34,35 @@ import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.Initialization;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.HttpClientConfig;
+import org.apache.druid.java.util.http.client.HttpClientInit;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
+import org.apache.druid.server.initialization.jetty.JettyServerModule;
import org.apache.druid.server.initialization.jetty.ServletFilterHolder;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.eclipse.jetty.server.Server;
import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import javax.servlet.DispatcherType;
import javax.servlet.Filter;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import java.io.ByteArrayOutputStream;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
@@ -61,6 +70,8 @@ import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.EnumSet;
import java.util.Locale;
import java.util.Map;
@@ -74,10 +85,109 @@ import java.util.zip.GZIPOutputStream;
public class JettyTest extends BaseJettyTest
{
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ private HttpClientConfig sslConfig;
+
+ private Injector injector;
+
+ private LatchedRequestStateHolder latchedRequestState;
+
@Override
protected Injector setupInjector()
{
- return Initialization.makeInjectorWithModules(
+ TLSServerConfig tlsConfig;
+ try {
+ File keyStore = new File(JettyTest.class.getClassLoader().getResource("server.jks").getFile());
+ Path tmpKeyStore = Files.copy(keyStore.toPath(), new File(folder.newFolder(), "server.jks").toPath());
+ File trustStore = new File(JettyTest.class.getClassLoader().getResource("truststore.jks").getFile());
+ Path tmpTrustStore = Files.copy(trustStore.toPath(), new File(folder.newFolder(), "truststore.jks").toPath());
+ PasswordProvider pp = () -> "druid123";
+ tlsConfig = new TLSServerConfig()
+ {
+ @Override
+ public String getKeyStorePath()
+ {
+ return tmpKeyStore.toString();
+ }
+
+ @Override
+ public String getKeyStoreType()
+ {
+ return "jks";
+ }
+
+ @Override
+ public PasswordProvider getKeyStorePasswordProvider()
+ {
+ return pp;
+ }
+
+ @Override
+ public PasswordProvider getKeyManagerPasswordProvider()
+ {
+ return pp;
+ }
+
+ @Override
+ public String getTrustStorePath()
+ {
+ return tmpTrustStore.toString();
+ }
+
+ @Override
+ public String getTrustStoreAlgorithm()
+ {
+ return "PKIX";
+ }
+
+ @Override
+ public PasswordProvider getTrustStorePasswordProvider()
+ {
+ return pp;
+ }
+
+ @Override
+ public String getCertAlias()
+ {
+ return "druid";
+ }
+
+ @Override
+ public boolean isRequireClientCertificate()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isRequestClientCertificate()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isValidateHostnames()
+ {
+ return false;
+ }
+ };
+
+ sslConfig =
+ HttpClientConfig.builder()
+ .withSslContext(
+ HttpClientInit.sslContextWithTrustedKeyStore(tmpTrustStore.toString(), pp.getPassword())
+ )
+ .withWorkerCount(1)
+ .withReadTimeout(Duration.ZERO)
+ .build();
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ latchedRequestState = new LatchedRequestStateHolder();
+ injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.<Module>of(
new Module()
@@ -88,9 +198,11 @@ public class JettyTest extends BaseJettyTest
JsonConfigProvider.bindInstance(
binder,
Key.get(DruidNode.class, Self.class),
- new DruidNode("test", "localhost", false, null, null, true, false)
+ new DruidNode("test", "localhost", false, 9988, 9999, true, true)
);
+ binder.bind(TLSServerConfig.class).toInstance(tlsConfig);
binder.bind(JettyServerInitializer.class).to(JettyServerInit.class).in(LazySingleton.class);
+ binder.bind(LatchedRequestStateHolder.class).toInstance(latchedRequestState);
Multibinder<ServletFilterHolder> multibinder = Multibinder.newSetBinder(
binder,
@@ -132,7 +244,9 @@ public class JettyTest extends BaseJettyTest
}
);
+
Jerseys.addResource(binder, SlowResource.class);
+ Jerseys.addResource(binder, LatchedResource.class);
Jerseys.addResource(binder, ExceptionResource.class);
Jerseys.addResource(binder, DefaultResource.class);
Jerseys.addResource(binder, DirectlyReturnResource.class);
@@ -143,6 +257,7 @@ public class JettyTest extends BaseJettyTest
)
);
+ return injector;
}
@Test
@@ -209,13 +324,19 @@ public class JettyTest extends BaseJettyTest
final HttpURLConnection get = (HttpURLConnection) url.openConnection();
get.setRequestProperty("Accept-Encoding", "gzip");
Assert.assertEquals("gzip", get.getContentEncoding());
- Assert.assertEquals(DEFAULT_RESPONSE_CONTENT, IOUtils.toString(new GZIPInputStream(get.getInputStream()), StandardCharsets.UTF_8));
+ Assert.assertEquals(
+ DEFAULT_RESPONSE_CONTENT,
+ IOUtils.toString(new GZIPInputStream(get.getInputStream()), StandardCharsets.UTF_8)
+ );
final HttpURLConnection post = (HttpURLConnection) url.openConnection();
post.setRequestProperty("Accept-Encoding", "gzip");
post.setRequestMethod("POST");
Assert.assertEquals("gzip", post.getContentEncoding());
- Assert.assertEquals(DEFAULT_RESPONSE_CONTENT, IOUtils.toString(new GZIPInputStream(post.getInputStream()), StandardCharsets.UTF_8));
+ Assert.assertEquals(
+ DEFAULT_RESPONSE_CONTENT,
+ IOUtils.toString(new GZIPInputStream(post.getInputStream()), StandardCharsets.UTF_8)
+ );
final HttpURLConnection getNoGzip = (HttpURLConnection) url.openConnection();
Assert.assertNotEquals("gzip", getNoGzip.getContentEncoding());
@@ -224,7 +345,10 @@ public class JettyTest extends BaseJettyTest
final HttpURLConnection postNoGzip = (HttpURLConnection) url.openConnection();
postNoGzip.setRequestMethod("POST");
Assert.assertNotEquals("gzip", postNoGzip.getContentEncoding());
- Assert.assertEquals(DEFAULT_RESPONSE_CONTENT, IOUtils.toString(postNoGzip.getInputStream(), StandardCharsets.UTF_8));
+ Assert.assertEquals(
+ DEFAULT_RESPONSE_CONTENT,
+ IOUtils.toString(postNoGzip.getInputStream(), StandardCharsets.UTF_8)
+ );
}
// Tests that threads are not stuck when partial chunk is not finalized
@@ -311,4 +435,81 @@ public class JettyTest extends BaseJettyTest
new InputStreamResponseHandler()
).get()), Charset.defaultCharset()));
}
+
+ @Test
+ public void testNumConnectionsMetricHttp() throws Exception
+ {
+ String text = "hello";
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(out)) {
+ gzipOutputStream.write(text.getBytes(Charset.defaultCharset()));
+ }
+ Request request = new Request(HttpMethod.GET, new URL("http://localhost:" + port + "/latched/hello"));
+ request.setHeader("Content-Encoding", "gzip");
+ request.setContent(MediaType.TEXT_PLAIN, out.toByteArray());
+
+ JettyServerModule jsm = injector.getInstance(JettyServerModule.class);
+ latchedRequestState.reset();
+
+ Assert.assertEquals(0, jsm.getActiveConnections());
+ ListenableFuture<InputStream> go = client.go(
+ request,
+ new InputStreamResponseHandler()
+ );
+ latchedRequestState.clientWaitForServerToStartRequest();
+ Assert.assertEquals(1, jsm.getActiveConnections());
+ latchedRequestState.clientReadyToFinishRequest();
+ go.get();
+ waitForJettyServerModuleActiveConnectionsZero(jsm);
+ Assert.assertEquals(0, jsm.getActiveConnections());
+ }
+
+ @Test
+ public void testNumConnectionsMetricHttps() throws Exception
+ {
+ String text = "hello";
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(out)) {
+ gzipOutputStream.write(text.getBytes(Charset.defaultCharset()));
+ }
+ Request request = new Request(HttpMethod.GET, new URL("https://localhost:" + tlsPort + "/latched/hello"));
+ request.setHeader("Content-Encoding", "gzip");
+ request.setContent(MediaType.TEXT_PLAIN, out.toByteArray());
+ HttpClient client;
+ try {
+ client = HttpClientInit.createClient(
+ sslConfig,
+ lifecycle
+ );
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ JettyServerModule jsm = injector.getInstance(JettyServerModule.class);
+ latchedRequestState.reset();
+
+ Assert.assertEquals(0, jsm.getActiveConnections());
+ ListenableFuture<InputStream> go = client.go(
+ request,
+ new InputStreamResponseHandler()
+ );
+ latchedRequestState.clientWaitForServerToStartRequest();
+ Assert.assertEquals(1, jsm.getActiveConnections());
+ latchedRequestState.clientReadyToFinishRequest();
+ go.get();
+ waitForJettyServerModuleActiveConnectionsZero(jsm);
+ Assert.assertEquals(0, jsm.getActiveConnections());
+ }
+
+ private void waitForJettyServerModuleActiveConnectionsZero(JettyServerModule jsm) throws InterruptedException
+ {
+ // it can take a bit to close the connection, so maybe sleep for a while and hope it closes
+ final int sleepTimeMills = 10;
+ final int totalSleeps = 5_000 / sleepTimeMills;
+ int count = 0;
+ while (jsm.getActiveConnections() > 0 && count++ < totalSleeps) {
+ Thread.sleep(sleepTimeMills);
+ }
+ }
}
diff --git a/server/src/test/resources/server.jks b/server/src/test/resources/server.jks
new file mode 100644
index 0000000..664e375
Binary files /dev/null and b/server/src/test/resources/server.jks differ
diff --git a/server/src/test/resources/truststore.jks b/server/src/test/resources/truststore.jks
new file mode 100644
index 0000000..3cd268c
Binary files /dev/null and b/server/src/test/resources/truststore.jks differ
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org