You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/04/15 21:29:08 UTC
[incubator-druid] branch master updated: Enhance the Http Firehose
to work with URIs requiring basic authentication (#7145)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8acad27 Enhance the Http Firehose to work with URIs requiring basic authentication (#7145)
8acad27 is described below
commit 8acad27d997b48cc802cce9352f51757a01c6336
Author: Lucas Capistrant <ca...@users.noreply.github.com>
AuthorDate: Mon Apr 15 16:29:01 2019 -0500
Enhance the Http Firehose to work with URIs requiring basic authentication (#7145)
* Enhnace the HttpFirehose to work with both insecure URIs and URIs requiring basic authentication
* Improve security of enhanced HttpFirehoseFactory by not logging auth credentials
* Fix checkstyle failure in HttpFirehoseFactory.java
* Update docs and fix TeamCity build with required noinspection
* Indentation cleanup and logic modification for HttpFirehose object stream
* Remove default Empty string password provider in http firehose
* Add JavaDoc for MixIn describing its intended use
* Reverting documentation notation for json code to be inline with rest of doc
* Improve instantiation of ObjectMappers that require MixIn for redacting password from task logs
* Add comment to clarify fully qualified references of Objects in SQLMetadataStorageActionHandler
---
.../metadata/PasswordProviderRedactionMixIn.java | 33 +++++++++
.../metadata/DefaultPasswordProviderTest.java | 9 +++
docs/content/ingestion/firehose.md | 33 +++++++++
.../worker/executor/ExecutorLifecycle.java | 4 +-
.../metadata/SQLMetadataStorageActionHandler.java | 5 +-
.../realtime/firehose/HttpFirehoseFactory.java | 78 +++++++++++++++++-----
.../realtime/firehose/HttpFirehoseFactoryTest.java | 6 +-
7 files changed, 150 insertions(+), 18 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/metadata/PasswordProviderRedactionMixIn.java b/core/src/main/java/org/apache/druid/metadata/PasswordProviderRedactionMixIn.java
new file mode 100644
index 0000000..37a5b9a
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/metadata/PasswordProviderRedactionMixIn.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * This Interface is used as a MixIn for ObjectMapper objects when there is a desire to avoid serializing a Password
+ * from a PasswordProvider to JSON in plaintext when that JSON is going to be used for purposes that don't require the
+ * password to be present, such as logging to a file.
+ */
+public interface PasswordProviderRedactionMixIn
+{
+ @JsonIgnore
+ String getPassword();
+}
diff --git a/core/src/test/java/org/apache/druid/metadata/DefaultPasswordProviderTest.java b/core/src/test/java/org/apache/druid/metadata/DefaultPasswordProviderTest.java
index f1adc82..20278f0 100644
--- a/core/src/test/java/org/apache/druid/metadata/DefaultPasswordProviderTest.java
+++ b/core/src/test/java/org/apache/druid/metadata/DefaultPasswordProviderTest.java
@@ -58,4 +58,13 @@ public class DefaultPasswordProviderTest
PasswordProvider.class);
Assert.assertEquals(pwd, pp.getPassword());
}
+
+ @Test
+ public void testSerializationWithMixIn() throws Exception
+ {
+ DefaultPasswordProvider pp = new DefaultPasswordProvider(pwd);
+ jsonMapper.addMixIn(PasswordProvider.class, PasswordProviderRedactionMixIn.class);
+ String valueAsString = jsonMapper.writeValueAsString(pp);
+ Assert.assertEquals("{\"type\":\"default\"}", valueAsString);
+ }
}
diff --git a/docs/content/ingestion/firehose.md b/docs/content/ingestion/firehose.md
index 8cfdc72..fe75a7e 100644
--- a/docs/content/ingestion/firehose.md
+++ b/docs/content/ingestion/firehose.md
@@ -74,6 +74,39 @@ A sample http firehose spec is shown below:
}
```
+The below configurations can be optionally used if the URIs specified in the spec require a Basic Authentication Header.
+Omitting these fields from your spec will result in HTTP requests with no Basic Authentication Header.
+
+|property|description|default|
+|--------|-----------|-------|
+|httpAuthenticationUsername|Username to use for authentication with specified URIs|None|
+|httpAuthenticationPassword|PasswordProvider to use with specified URIs|None|
+
+Example with authentication fields using the DefaultPassword provider (this requires the password to be in the ingestion spec):
+
+```json
+{
+ "type": "http",
+ "uris": ["http://example.com/uri1", "http://example2.com/uri2"],
+ "httpAuthenticationUsername": "username",
+ "httpAuthenticationPassword": "password123"
+}
+```
+
+You can also use the other existing Druid PasswordProviders. Here is an example using the EnvironmentVariablePasswordProvider:
+
+```json
+{
+ "type": "http",
+ "uris": ["http://example.com/uri1", "http://example2.com/uri2"],
+ "httpAuthenticationUsername": "username",
+ "httpAuthenticationPassword": {
+ "type": "environment",
+ "variable": "HTTP_FIREHOSE_PW"
+ }
+}
+```
+
The below configurations can be optionally used for tuning the firehose performance.
|property|description|default|
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java
index 0d6a4c5..3568e79 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java
@@ -37,6 +37,8 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.PasswordProvider;
+import org.apache.druid.metadata.PasswordProviderRedactionMixIn;
import java.io.File;
import java.io.IOException;
@@ -80,7 +82,7 @@ public class ExecutorLifecycle
this.taskConfig = taskConfig;
this.taskActionClientFactory = taskActionClientFactory;
this.taskRunner = taskRunner;
- this.jsonMapper = jsonMapper;
+ this.jsonMapper = jsonMapper.copy().addMixIn(PasswordProvider.class, PasswordProviderRedactionMixIn.class);
}
@LifecycleStart
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
index cc4621e..acdbbd0 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
@@ -82,7 +82,10 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
)
{
this.connector = connector;
- this.jsonMapper = jsonMapper;
+ //fully qualified references required below due to identical package names across project modules.
+ //noinspection UnnecessaryFullyQualifiedName
+ this.jsonMapper = jsonMapper.copy().addMixIn(org.apache.druid.metadata.PasswordProvider.class,
+ org.apache.druid.metadata.PasswordProviderRedactionMixIn.class);
this.entryType = types.getEntryType();
this.statusType = types.getStatusType();
this.logType = types.getLogType();
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java
index 81258c3..eeec268 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java
@@ -21,8 +21,10 @@ package org.apache.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
import com.google.common.net.HttpHeaders;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputSplit;
@@ -30,12 +32,15 @@ import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.utils.CompressionUtils;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URLConnection;
+import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -46,6 +51,10 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
private static final Logger log = new Logger(HttpFirehoseFactory.class);
private final List<URI> uris;
private final boolean supportContentRange;
+ @Nullable
+ private final String httpAuthenticationUsername;
+ @Nullable
+ private final PasswordProvider httpAuthenticationPasswordProvider;
@JsonCreator
public HttpFirehoseFactory(
@@ -54,7 +63,11 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
@JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes,
@JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes,
@JsonProperty("fetchTimeout") Long fetchTimeout,
- @JsonProperty("maxFetchRetry") Integer maxFetchRetry
+ @JsonProperty("maxFetchRetry") Integer maxFetchRetry,
+ @Nullable
+ @JsonProperty("httpAuthenticationUsername") String httpAuthenticationUsername,
+ @Nullable
+ @JsonProperty("httpAuthenticationPassword") PasswordProvider httpAuthenticationPasswordProvider
) throws IOException
{
super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry);
@@ -64,6 +77,20 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
final URLConnection connection = uris.get(0).toURL().openConnection();
final String acceptRanges = connection.getHeaderField(HttpHeaders.ACCEPT_RANGES);
this.supportContentRange = acceptRanges != null && "bytes".equalsIgnoreCase(acceptRanges);
+ this.httpAuthenticationUsername = httpAuthenticationUsername;
+ this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider;
+ }
+
+ @JsonProperty
+ public String getHttpAuthenticationUsername()
+ {
+ return httpAuthenticationUsername;
+ }
+
+ @JsonProperty("httpAuthenticationPassword")
+ public PasswordProvider getHttpAuthenticationPasswordProvider()
+ {
+ return httpAuthenticationPasswordProvider;
}
@JsonProperty
@@ -81,26 +108,29 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
@Override
protected InputStream openObjectStream(URI object) throws IOException
{
- return object.toURL().openConnection().getInputStream();
+ // A negative start value will ensure no bytes of the InputStream are skipped
+ return openObjectStream(object, 0);
}
@Override
protected InputStream openObjectStream(URI object, long start) throws IOException
{
- if (supportContentRange) {
- final URLConnection connection = object.toURL().openConnection();
+ URLConnection urlConnection = openURLConnection(object);
+ if (supportContentRange && start > 0) {
// Set header for range request.
// Since we need to set only the start offset, the header is "bytes=<range-start>-".
// See https://tools.ietf.org/html/rfc7233#section-2.1
- connection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", start));
- return connection.getInputStream();
+ urlConnection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", start));
+ return urlConnection.getInputStream();
} else {
- log.warn(
- "Since the input source doesn't support range requests, the object input stream is opened from the start and "
- + "then skipped. This may make the ingestion speed slower. Consider enabling prefetch if you see this message"
- + " a lot."
- );
- final InputStream in = openObjectStream(object);
+ if (!supportContentRange && start > 0) {
+ log.warn(
+ "Since the input source doesn't support range requests, the object input stream is opened from the start and "
+ + "then skipped. This may make the ingestion speed slower. Consider enabling prefetch if you see this message"
+ + " a lot."
+ );
+ }
+ final InputStream in = urlConnection.getInputStream();
in.skip(start);
return in;
}
@@ -129,7 +159,9 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() &&
getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() &&
getFetchTimeout() == that.getFetchTimeout() &&
- getMaxFetchRetry() == that.getMaxFetchRetry();
+ getMaxFetchRetry() == that.getMaxFetchRetry() &&
+ httpAuthenticationUsername.equals(that.getHttpAuthenticationUsername()) &&
+ httpAuthenticationPasswordProvider.equals(that.getHttpAuthenticationPasswordProvider());
}
@Override
@@ -141,7 +173,9 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
getMaxFetchCapacityBytes(),
getPrefetchTriggerBytes(),
getFetchTimeout(),
- getMaxFetchRetry()
+ getMaxFetchRetry(),
+ httpAuthenticationUsername,
+ httpAuthenticationPasswordProvider
);
}
@@ -161,11 +195,25 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
getMaxFetchCapacityBytes(),
getPrefetchTriggerBytes(),
getFetchTimeout(),
- getMaxFetchRetry()
+ getMaxFetchRetry(),
+ getHttpAuthenticationUsername(),
+ httpAuthenticationPasswordProvider
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
+
+ @VisibleForTesting
+ URLConnection openURLConnection(URI object) throws IOException
+ {
+ URLConnection urlConnection = object.toURL().openConnection();
+ if (!Strings.isNullOrEmpty(httpAuthenticationUsername) && httpAuthenticationPasswordProvider != null) {
+ String userPass = httpAuthenticationUsername + ":" + httpAuthenticationPasswordProvider.getPassword();
+ String basicAuthString = "Basic " + Base64.getEncoder().encodeToString(StringUtils.toUtf8(userPass));
+ urlConnection.setRequestProperty("Authorization", basicAuthString);
+ }
+ return urlConnection;
+ }
}
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java
index 0d74be6..e44ff7f 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.segment.realtime.firehose;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.metadata.DefaultPasswordProvider;
import org.junit.Assert;
import org.junit.Test;
@@ -34,13 +35,16 @@ public class HttpFirehoseFactoryTest
public void testSerde() throws IOException
{
final ObjectMapper mapper = new DefaultObjectMapper();
+ final DefaultPasswordProvider pwProvider = new DefaultPasswordProvider("testPassword");
final HttpFirehoseFactory factory = new HttpFirehoseFactory(
ImmutableList.of(URI.create("http://foo/bar"), URI.create("http://foo/bar2")),
2048L,
1024L,
512L,
100L,
- 5
+ 5,
+ "testUser",
+ pwProvider
);
final HttpFirehoseFactory outputFact = mapper.readValue(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org