You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/04/15 21:29:08 UTC

[incubator-druid] branch master updated: Enhance the Http Firehose to work with URIs requiring basic authentication (#7145)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8acad27  Enhance the Http Firehose to work with URIs requiring basic authentication (#7145)
8acad27 is described below

commit 8acad27d997b48cc802cce9352f51757a01c6336
Author: Lucas Capistrant <ca...@users.noreply.github.com>
AuthorDate: Mon Apr 15 16:29:01 2019 -0500

    Enhance the Http Firehose to work with URIs requiring basic authentication (#7145)
    
    * Enhnace the HttpFirehose to work with both insecure URIs and URIs requiring basic authentication
    
    * Improve security of enhanced HttpFirehoseFactory by not logging auth credentials
    
    * Fix checkstyle failure in HttpFirehoseFactory.java
    
    * Update docs and fix TeamCity build with required noinspection
    
    * Indentation cleanup and logic modification for HttpFirehose object stream
    
    * Remove default Empty string password provider in http firehose
    
    * Add JavaDoc for MixIn describing its intended use
    
    * Reverting documentation notation for json code to be inline with rest of doc
    
    * Improve instantiation of ObjectMappers that require MixIn for redacting password from task logs
    
    * Add comment to clarify fully qualified references of Objects in SQLMetadataStorageActionHandler
---
 .../metadata/PasswordProviderRedactionMixIn.java   | 33 +++++++++
 .../metadata/DefaultPasswordProviderTest.java      |  9 +++
 docs/content/ingestion/firehose.md                 | 33 +++++++++
 .../worker/executor/ExecutorLifecycle.java         |  4 +-
 .../metadata/SQLMetadataStorageActionHandler.java  |  5 +-
 .../realtime/firehose/HttpFirehoseFactory.java     | 78 +++++++++++++++++-----
 .../realtime/firehose/HttpFirehoseFactoryTest.java |  6 +-
 7 files changed, 150 insertions(+), 18 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/metadata/PasswordProviderRedactionMixIn.java b/core/src/main/java/org/apache/druid/metadata/PasswordProviderRedactionMixIn.java
new file mode 100644
index 0000000..37a5b9a
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/metadata/PasswordProviderRedactionMixIn.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * This Interface is used as a MixIn for ObjectMapper objects when there is a desire to avoid serializing a Password
+ * from a PasswordProvider to JSON in plaintext when that JSON is going to be used for purposes that don't require the
+ * password to be present, such as logging to a file.
+ */
+public interface PasswordProviderRedactionMixIn
+{
+  @JsonIgnore
+  String getPassword();
+}
diff --git a/core/src/test/java/org/apache/druid/metadata/DefaultPasswordProviderTest.java b/core/src/test/java/org/apache/druid/metadata/DefaultPasswordProviderTest.java
index f1adc82..20278f0 100644
--- a/core/src/test/java/org/apache/druid/metadata/DefaultPasswordProviderTest.java
+++ b/core/src/test/java/org/apache/druid/metadata/DefaultPasswordProviderTest.java
@@ -58,4 +58,13 @@ public class DefaultPasswordProviderTest
         PasswordProvider.class);
     Assert.assertEquals(pwd, pp.getPassword());
   }
+
+  @Test
+  public void testSerializationWithMixIn() throws Exception
+  {
+    DefaultPasswordProvider pp = new DefaultPasswordProvider(pwd);
+    jsonMapper.addMixIn(PasswordProvider.class, PasswordProviderRedactionMixIn.class);
+    String valueAsString = jsonMapper.writeValueAsString(pp);
+    Assert.assertEquals("{\"type\":\"default\"}", valueAsString);
+  }
 }
diff --git a/docs/content/ingestion/firehose.md b/docs/content/ingestion/firehose.md
index 8cfdc72..fe75a7e 100644
--- a/docs/content/ingestion/firehose.md
+++ b/docs/content/ingestion/firehose.md
@@ -74,6 +74,39 @@ A sample http firehose spec is shown below:
 }
 ```
 
+The below configurations can be optionally used if the URIs specified in the spec require a Basic Authentication Header.
+Omitting these fields from your spec will result in HTTP requests with no Basic Authentication Header.
+
+|property|description|default|
+|--------|-----------|-------|
+|httpAuthenticationUsername|Username to use for authentication with specified URIs|None|
+|httpAuthenticationPassword|PasswordProvider to use with specified URIs|None|
+
+Example with authentication fields using the DefaultPassword provider (this requires the password to be in the ingestion spec):
+
+```json
+{
+    "type": "http",
+    "uris": ["http://example.com/uri1", "http://example2.com/uri2"],
+    "httpAuthenticationUsername": "username",
+    "httpAuthenticationPassword": "password123"
+}
+```
+
+You can also use the other existing Druid PasswordProviders. Here is an example using the EnvironmentVariablePasswordProvider:
+
+```json
+{
+    "type": "http",
+    "uris": ["http://example.com/uri1", "http://example2.com/uri2"],
+    "httpAuthenticationUsername": "username",
+    "httpAuthenticationPassword": {
+        "type": "environment",
+        "variable": "HTTP_FIREHOSE_PW"
+    }
+}
+```
+
 The below configurations can be optionally used for tuning the firehose performance.
 
 |property|description|default|
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java
index 0d6a4c5..3568e79 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java
@@ -37,6 +37,8 @@ import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.PasswordProvider;
+import org.apache.druid.metadata.PasswordProviderRedactionMixIn;
 
 import java.io.File;
 import java.io.IOException;
@@ -80,7 +82,7 @@ public class ExecutorLifecycle
     this.taskConfig = taskConfig;
     this.taskActionClientFactory = taskActionClientFactory;
     this.taskRunner = taskRunner;
-    this.jsonMapper = jsonMapper;
+    this.jsonMapper = jsonMapper.copy().addMixIn(PasswordProvider.class, PasswordProviderRedactionMixIn.class);
   }
 
   @LifecycleStart
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
index cc4621e..acdbbd0 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
@@ -82,7 +82,10 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
   )
   {
     this.connector = connector;
-    this.jsonMapper = jsonMapper;
+    //fully qualified references required below due to identical package names across project modules.
+    //noinspection UnnecessaryFullyQualifiedName
+    this.jsonMapper = jsonMapper.copy().addMixIn(org.apache.druid.metadata.PasswordProvider.class,
+            org.apache.druid.metadata.PasswordProviderRedactionMixIn.class);
     this.entryType = types.getEntryType();
     this.statusType = types.getStatusType();
     this.logType = types.getLogType();
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java
index 81258c3..eeec268 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java
@@ -21,8 +21,10 @@ package org.apache.druid.segment.realtime.firehose;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
 import com.google.common.net.HttpHeaders;
 import org.apache.druid.data.input.FiniteFirehoseFactory;
 import org.apache.druid.data.input.InputSplit;
@@ -30,12 +32,15 @@ import org.apache.druid.data.input.impl.StringInputRowParser;
 import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.PasswordProvider;
 import org.apache.druid.utils.CompressionUtils;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.net.URLConnection;
+import java.util.Base64;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -46,6 +51,10 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
   private static final Logger log = new Logger(HttpFirehoseFactory.class);
   private final List<URI> uris;
   private final boolean supportContentRange;
+  @Nullable
+  private final String httpAuthenticationUsername;
+  @Nullable
+  private final PasswordProvider httpAuthenticationPasswordProvider;
 
   @JsonCreator
   public HttpFirehoseFactory(
@@ -54,7 +63,11 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
       @JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes,
       @JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes,
       @JsonProperty("fetchTimeout") Long fetchTimeout,
-      @JsonProperty("maxFetchRetry") Integer maxFetchRetry
+      @JsonProperty("maxFetchRetry") Integer maxFetchRetry,
+      @Nullable
+      @JsonProperty("httpAuthenticationUsername") String httpAuthenticationUsername,
+      @Nullable
+      @JsonProperty("httpAuthenticationPassword") PasswordProvider httpAuthenticationPasswordProvider
   ) throws IOException
   {
     super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry);
@@ -64,6 +77,20 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
     final URLConnection connection = uris.get(0).toURL().openConnection();
     final String acceptRanges = connection.getHeaderField(HttpHeaders.ACCEPT_RANGES);
     this.supportContentRange = acceptRanges != null && "bytes".equalsIgnoreCase(acceptRanges);
+    this.httpAuthenticationUsername = httpAuthenticationUsername;
+    this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider;
+  }
+
+  @JsonProperty
+  public String getHttpAuthenticationUsername()
+  {
+    return httpAuthenticationUsername;
+  }
+
+  @JsonProperty("httpAuthenticationPassword")
+  public PasswordProvider getHttpAuthenticationPasswordProvider()
+  {
+    return httpAuthenticationPasswordProvider;
   }
 
   @JsonProperty
@@ -81,26 +108,29 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
   @Override
   protected InputStream openObjectStream(URI object) throws IOException
   {
-    return object.toURL().openConnection().getInputStream();
+    // A negative start value will ensure no bytes of the InputStream are skipped
+    return openObjectStream(object, 0);
   }
 
   @Override
   protected InputStream openObjectStream(URI object, long start) throws IOException
   {
-    if (supportContentRange) {
-      final URLConnection connection = object.toURL().openConnection();
+    URLConnection urlConnection = openURLConnection(object);
+    if (supportContentRange && start > 0) {
       // Set header for range request.
       // Since we need to set only the start offset, the header is "bytes=<range-start>-".
       // See https://tools.ietf.org/html/rfc7233#section-2.1
-      connection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", start));
-      return connection.getInputStream();
+      urlConnection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", start));
+      return urlConnection.getInputStream();
     } else {
-      log.warn(
-          "Since the input source doesn't support range requests, the object input stream is opened from the start and "
-          + "then skipped. This may make the ingestion speed slower. Consider enabling prefetch if you see this message"
-          + " a lot."
-      );
-      final InputStream in = openObjectStream(object);
+      if (!supportContentRange && start > 0) {
+        log.warn(
+                "Since the input source doesn't support range requests, the object input stream is opened from the start and "
+                        + "then skipped. This may make the ingestion speed slower. Consider enabling prefetch if you see this message"
+                        + " a lot."
+        );
+      }
+      final InputStream in = urlConnection.getInputStream();
       in.skip(start);
       return in;
     }
@@ -129,7 +159,9 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
            getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() &&
            getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() &&
            getFetchTimeout() == that.getFetchTimeout() &&
-           getMaxFetchRetry() == that.getMaxFetchRetry();
+           getMaxFetchRetry() == that.getMaxFetchRetry() &&
+           httpAuthenticationUsername.equals(that.getHttpAuthenticationUsername()) &&
+           httpAuthenticationPasswordProvider.equals(that.getHttpAuthenticationPasswordProvider());
   }
 
   @Override
@@ -141,7 +173,9 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
         getMaxFetchCapacityBytes(),
         getPrefetchTriggerBytes(),
         getFetchTimeout(),
-        getMaxFetchRetry()
+        getMaxFetchRetry(),
+        httpAuthenticationUsername,
+        httpAuthenticationPasswordProvider
     );
   }
 
@@ -161,11 +195,25 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
           getMaxFetchCapacityBytes(),
           getPrefetchTriggerBytes(),
           getFetchTimeout(),
-          getMaxFetchRetry()
+          getMaxFetchRetry(),
+          getHttpAuthenticationUsername(),
+          httpAuthenticationPasswordProvider
       );
     }
     catch (IOException e) {
       throw new RuntimeException(e);
     }
   }
+
+  @VisibleForTesting
+  URLConnection openURLConnection(URI object) throws IOException
+  {
+    URLConnection urlConnection = object.toURL().openConnection();
+    if (!Strings.isNullOrEmpty(httpAuthenticationUsername) && httpAuthenticationPasswordProvider != null) {
+      String userPass = httpAuthenticationUsername + ":" + httpAuthenticationPasswordProvider.getPassword();
+      String basicAuthString = "Basic " + Base64.getEncoder().encodeToString(StringUtils.toUtf8(userPass));
+      urlConnection.setRequestProperty("Authorization", basicAuthString);
+    }
+    return urlConnection;
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java
index 0d74be6..e44ff7f 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.segment.realtime.firehose;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.metadata.DefaultPasswordProvider;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -34,13 +35,16 @@ public class HttpFirehoseFactoryTest
   public void testSerde() throws IOException
   {
     final ObjectMapper mapper = new DefaultObjectMapper();
+    final DefaultPasswordProvider pwProvider = new DefaultPasswordProvider("testPassword");
     final HttpFirehoseFactory factory = new HttpFirehoseFactory(
         ImmutableList.of(URI.create("http://foo/bar"), URI.create("http://foo/bar2")),
         2048L,
         1024L,
         512L,
         100L,
-        5
+        5,
+        "testUser",
+        pwProvider
     );
 
     final HttpFirehoseFactory outputFact = mapper.readValue(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org