You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2021/03/06 19:43:24 UTC

[druid] branch master updated: Add configurations for allowed protocols for HTTP and HDFS inputSources/firehoses (#10830)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9946306  Add configurations for allowed protocols for HTTP and HDFS inputSources/firehoses (#10830)
9946306 is described below

commit 9946306d4b2c16a7fc8bac97c9f4815ed4b46570
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Sat Mar 6 11:43:00 2021 -0800

    Add configurations for allowed protocols for HTTP and HDFS inputSources/firehoses (#10830)
    
    * Allow only HTTP and HTTPS protocols for the HTTP inputSource
    
    * rename
    
    * Update core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
    
    Co-authored-by: Abhishek Agarwal <14...@users.noreply.github.com>
    
    * fix http firehose and update doc
    
    * HDFS inputSource
    
    * add configs for allowed protocols
    
    * fix checkstyle and doc
    
    * more checkstyle
    
    * remove stale doc
    
    * remove more doc
    
    * Apply doc suggestions from code review
    
    Co-authored-by: Charles Smith <38...@users.noreply.github.com>
    
    * update hdfs address in docs
    
    * fix test
    
    Co-authored-by: Abhishek Agarwal <14...@users.noreply.github.com>
    Co-authored-by: Charles Smith <38...@users.noreply.github.com>
---
 .../druid/data/input/impl/HttpInputSource.java     |  32 ++++-
 .../data/input/impl/HttpInputSourceConfig.java     |  83 ++++++++++++
 .../data/input/impl/HttpInputSourceConfigTest.java |  56 ++++++++
 .../druid/data/input/impl/HttpInputSourceTest.java |  60 ++++++++-
 docs/configuration/index.md                        |  23 ++++
 docs/ingestion/native-batch.md                     |  28 ++--
 .../druid/firehose/hdfs/HdfsFirehoseFactory.java   |  24 ++--
 .../druid/inputsource/hdfs/HdfsInputSource.java    |  51 +++++--
 .../inputsource/hdfs/HdfsInputSourceConfig.java    |  52 ++++++++
 .../druid/storage/hdfs/HdfsStorageDruidModule.java |   3 +
 .../firehose/hdfs/HdfsFirehoseFactoryTest.java     | 147 ++++++++++++++++++++-
 .../hdfs/HdfsInputSourceConfigTest.java            |  39 +++---
 .../inputsource/hdfs/HdfsInputSourceTest.java      |  91 ++++++++++++-
 .../storage/hdfs/HdfsStorageDruidModuleTest.java   |  80 +++++++++++
 .../druid/metadata/input/InputSourceModule.java    |   3 +
 .../realtime/firehose/HttpFirehoseFactory.java     |  41 +++---
 .../metadata/input/InputSourceModuleTest.java      |  40 ++++++
 .../realtime/firehose/HttpFirehoseFactoryTest.java |  90 ++++++++++++-
 18 files changed, 850 insertions(+), 93 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
index 21480fd..002d3ec 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.data.input.impl;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
@@ -28,6 +29,8 @@ import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.metadata.PasswordProvider;
 
 import javax.annotation.Nullable;
@@ -45,18 +48,31 @@ public class HttpInputSource extends AbstractInputSource implements SplittableIn
   private final String httpAuthenticationUsername;
   @Nullable
   private final PasswordProvider httpAuthenticationPasswordProvider;
+  private final HttpInputSourceConfig config;
 
   @JsonCreator
   public HttpInputSource(
       @JsonProperty("uris") List<URI> uris,
       @JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername,
-      @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider
+      @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider,
+      @JacksonInject HttpInputSourceConfig config
   )
   {
     Preconditions.checkArgument(uris != null && !uris.isEmpty(), "Empty URIs");
+    throwIfInvalidProtocols(config, uris);
     this.uris = uris;
     this.httpAuthenticationUsername = httpAuthenticationUsername;
     this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider;
+    this.config = config;
+  }
+
+  public static void throwIfInvalidProtocols(HttpInputSourceConfig config, List<URI> uris)
+  {
+    for (URI uri : uris) {
+      if (!config.getAllowedProtocols().contains(StringUtils.toLowerCase(uri.getScheme()))) {
+        throw new IAE("Only %s protocols are allowed", config.getAllowedProtocols());
+      }
+    }
   }
 
   @JsonProperty
@@ -97,7 +113,8 @@ public class HttpInputSource extends AbstractInputSource implements SplittableIn
     return new HttpInputSource(
         Collections.singletonList(split.get()),
         httpAuthenticationUsername,
-        httpAuthenticationPasswordProvider
+        httpAuthenticationPasswordProvider,
+        config
     );
   }
 
@@ -129,16 +146,17 @@ public class HttpInputSource extends AbstractInputSource implements SplittableIn
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    HttpInputSource source = (HttpInputSource) o;
-    return Objects.equals(uris, source.uris) &&
-           Objects.equals(httpAuthenticationUsername, source.httpAuthenticationUsername) &&
-           Objects.equals(httpAuthenticationPasswordProvider, source.httpAuthenticationPasswordProvider);
+    HttpInputSource that = (HttpInputSource) o;
+    return Objects.equals(uris, that.uris) &&
+           Objects.equals(httpAuthenticationUsername, that.httpAuthenticationUsername) &&
+           Objects.equals(httpAuthenticationPasswordProvider, that.httpAuthenticationPasswordProvider) &&
+           Objects.equals(config, that.config);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider);
+    return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider, config);
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java
new file mode 100644
index 0000000..40a2521
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.StringUtils;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class HttpInputSourceConfig
+{
+  @VisibleForTesting
+  public static final Set<String> DEFAULT_ALLOWED_PROTOCOLS = ImmutableSet.of("http", "https");
+
+  @JsonProperty
+  private final Set<String> allowedProtocols;
+
+  @JsonCreator
+  public HttpInputSourceConfig(
+      @JsonProperty("allowedProtocols") @Nullable Set<String> allowedProtocols
+  )
+  {
+    this.allowedProtocols = allowedProtocols == null || allowedProtocols.isEmpty()
+                            ? DEFAULT_ALLOWED_PROTOCOLS
+                            : allowedProtocols.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet());
+  }
+
+  public Set<String> getAllowedProtocols()
+  {
+    return allowedProtocols;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    HttpInputSourceConfig that = (HttpInputSourceConfig) o;
+    return Objects.equals(allowedProtocols, that.allowedProtocols);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(allowedProtocols);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "HttpInputSourceConfig{" +
+           ", allowedProtocols=" + allowedProtocols +
+           '}';
+  }
+}
+
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java
new file mode 100644
index 0000000..a3f24b9
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.google.common.collect.ImmutableSet;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HttpInputSourceConfigTest
+{
+
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(HttpInputSourceConfig.class).usingGetClass().verify();
+  }
+
+  @Test
+  public void testNullAllowedProtocolsUseDefault()
+  {
+    HttpInputSourceConfig config = new HttpInputSourceConfig(null);
+    Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols());
+  }
+
+  @Test
+  public void testEmptyAllowedProtocolsUseDefault()
+  {
+    HttpInputSourceConfig config = new HttpInputSourceConfig(ImmutableSet.of());
+    Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols());
+  }
+
+  @Test
+  public void testCustomAllowedProtocols()
+  {
+    HttpInputSourceConfig config = new HttpInputSourceConfig(ImmutableSet.of("druid"));
+    Assert.assertEquals(ImmutableSet.of("druid"), config.getAllowedProtocols());
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
index 61be2cc..9c17b57 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
@@ -19,29 +19,87 @@
 
 package org.apache.druid.data.input.impl;
 
+import com.fasterxml.jackson.databind.InjectableValues.Std;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.metadata.DefaultPasswordProvider;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
 import java.net.URI;
 
 public class HttpInputSourceTest
 {
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @Test
   public void testSerde() throws IOException
   {
+    HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(null);
     final ObjectMapper mapper = new ObjectMapper();
+    mapper.setInjectableValues(new Std().addValue(HttpInputSourceConfig.class, httpInputSourceConfig));
     final HttpInputSource source = new HttpInputSource(
         ImmutableList.of(URI.create("http://test.com/http-test")),
         "myName",
-        new DefaultPasswordProvider("myPassword")
+        new DefaultPasswordProvider("myPassword"),
+        httpInputSourceConfig
     );
     final byte[] json = mapper.writeValueAsBytes(source);
     final HttpInputSource fromJson = (HttpInputSource) mapper.readValue(json, InputSource.class);
     Assert.assertEquals(source, fromJson);
   }
+
+  @Test
+  public void testConstructorAllowsOnlyDefaultProtocols()
+  {
+    new HttpInputSource(
+        ImmutableList.of(URI.create("http:///")),
+        "myName",
+        new DefaultPasswordProvider("myPassword"),
+        new HttpInputSourceConfig(null)
+    );
+
+    new HttpInputSource(
+        ImmutableList.of(URI.create("https:///")),
+        "myName",
+        new DefaultPasswordProvider("myPassword"),
+        new HttpInputSourceConfig(null)
+    );
+
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("Only [http, https] protocols are allowed");
+    new HttpInputSource(
+        ImmutableList.of(URI.create("my-protocol:///")),
+        "myName",
+        new DefaultPasswordProvider("myPassword"),
+        new HttpInputSourceConfig(null)
+    );
+  }
+
+  @Test
+  public void testConstructorAllowsOnlyCustomProtocols()
+  {
+    final HttpInputSourceConfig customConfig = new HttpInputSourceConfig(ImmutableSet.of("druid"));
+    new HttpInputSource(
+        ImmutableList.of(URI.create("druid:///")),
+        "myName",
+        new DefaultPasswordProvider("myPassword"),
+        customConfig
+    );
+
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("Only [druid] protocols are allowed");
+    new HttpInputSource(
+        ImmutableList.of(URI.create("https:///")),
+        "myName",
+        new DefaultPasswordProvider("myPassword"),
+        customConfig
+    );
+  }
 }
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index b23432c..e6650b1 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -515,6 +515,28 @@ This deep storage is used to interface with Cassandra.  Note that the `druid-cas
 |`druid.storage.keyspace`|Cassandra key space.|none|
 
 
+### Ingestion Security Configuration
+
+#### HDFS input source
+
+You can set the following property to specify permissible protocols for
+the [HDFS input source](../ingestion/native-batch.md#hdfs-input-source) and the [HDFS firehose](../ingestion/native-batch.md#hdfsfirehose).
+
+|Property|Possible Values|Description|Default|
+|--------|---------------|-----------|-------|
+|`druid.ingestion.hdfs.allowedProtocols`|List of protocols|Allowed protocols for the HDFS input source and HDFS firehose.|["hdfs"]|
+
+
+#### HTTP input source
+
+You can set the following property to specify permissible protocols for
+the [HTTP input source](../ingestion/native-batch.md#http-input-source) and the [HTTP firehose](../ingestion/native-batch.md#httpfirehose).
+
+|Property|Possible Values|Description|Default|
+|--------|---------------|-----------|-------|
+|`druid.ingestion.http.allowedProtocols`|List of protocols|Allowed protocols for the HTTP input source and HTTP firehose.|["http", "https"]|
+
+
 ### Task Logging
 
 If you are running the indexing service in remote mode, the task logs must be stored in S3, Azure Blob Store, Google Cloud Storage or HDFS.
@@ -1355,6 +1377,7 @@ The amount of direct memory needed by Druid is at least
 ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=<VALUE>` at the command
 line.
 
+
 #### Query Configurations
 
 See [general query configuration](#general-query-configuration).
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index cc06475..5032ed1 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -1064,7 +1064,7 @@ Sample specs:
       "type": "index_parallel",
       "inputSource": {
         "type": "hdfs",
-        "paths": "hdfs://foo/bar/", "hdfs://bar/foo"
+        "paths": "hdfs://namenode_host/foo/bar/", "hdfs://namenode_host/bar/foo"
       },
       "inputFormat": {
         "type": "json"
@@ -1080,7 +1080,7 @@ Sample specs:
       "type": "index_parallel",
       "inputSource": {
         "type": "hdfs",
-        "paths": ["hdfs://foo/bar", "hdfs://bar/foo"]
+        "paths": "hdfs://namenode_host/foo/bar/", "hdfs://namenode_host/bar/foo"
       },
       "inputFormat": {
         "type": "json"
@@ -1096,7 +1096,7 @@ Sample specs:
       "type": "index_parallel",
       "inputSource": {
         "type": "hdfs",
-        "paths": "hdfs://foo/bar/file.json", "hdfs://bar/foo/file2.json"
+        "paths": "hdfs://namenode_host/foo/bar/file.json", "hdfs://namenode_host/bar/foo/file2.json"
       },
       "inputFormat": {
         "type": "json"
@@ -1112,7 +1112,7 @@ Sample specs:
       "type": "index_parallel",
       "inputSource": {
         "type": "hdfs",
-        "paths": ["hdfs://foo/bar/file.json", "hdfs://bar/foo/file2.json"]
+        "paths": ["hdfs://namenode_host/foo/bar/file.json", "hdfs://namenode_host/bar/foo/file2.json"]
       },
       "inputFormat": {
         "type": "json"
@@ -1127,9 +1127,10 @@ Sample specs:
 |type|This should be `hdfs`.|None|yes|
 |paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths. Empty files located under one of the given paths will be skipped.|None|yes|
 
-You can also ingest from cloud storage using the HDFS input source.
-However, if you want to read from AWS S3 or Google Cloud Storage, consider using
-the [S3 input source](#s3-input-source) or the [Google Cloud Storage input source](#google-cloud-storage-input-source) instead.
+You can also ingest from other storage using the HDFS input source if the HDFS client supports that storage.
+However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage.
+If you want to use a non-hdfs protocol with the HDFS input source, include the protocol
+in `druid.ingestion.hdfs.allowedProtocols`. See [HDFS input source security configuration](../configuration/index.md#hdfs-input-source) for more details.
 
 ### HTTP Input Source
 
@@ -1209,10 +1210,13 @@ You can also use the other existing Druid PasswordProviders. Here is an example
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|This should be `http`|None|yes|
-|uris|URIs of the input files.|None|yes|
+|uris|URIs of the input files. See below for the protocols allowed for URIs.|None|yes|
 |httpAuthenticationUsername|Username to use for authentication with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no|
 |httpAuthenticationPassword|PasswordProvider to use with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no|
 
+You can only use protocols listed in the `druid.ingestion.http.allowedProtocols` property as HTTP input sources.
+The `http` and `https` protocols are allowed by default. See [HTTP input source security configuration](../configuration/index.md#http-input-source) for more details.
+
 ### Inline Input Source
 
 The Inline input source can be used to read the data inlined in its own spec.
@@ -1559,6 +1563,11 @@ Note that prefetching or caching isn't that useful in the Parallel task.
 |fetchTimeout|Timeout for fetching each file.|60000|
 |maxFetchRetry|Maximum number of retries for fetching each file.|3|
 
+You can also ingest from other storage using the HDFS firehose if the HDFS client supports that storage.
+However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage.
+If you want to use a non-hdfs protocol with the HDFS firehose, you need to include the protocol you want
+in `druid.ingestion.hdfs.allowedProtocols`. See [HDFS firehose security configuration](../configuration/index.md#hdfs-input-source) for more details.
+
 ### LocalFirehose
 
 This Firehose can be used to read the data from files on local disk, and is mainly intended for proof-of-concept testing, and works with `string` typed parsers.
@@ -1596,6 +1605,9 @@ A sample HTTP Firehose spec is shown below:
 }
 ```
 
+You can only use protocols listed in the `druid.ingestion.http.allowedProtocols` property as HTTP firehose input sources.
+The `http` and `https` protocols are allowed by default. See [HTTP firehose security configuration](../configuration/index.md#http-input-source) for more details.
+
 The below configurations can be optionally used if the URIs specified in the spec require a Basic Authentication Header.
 Omitting these fields from your spec will result in HTTP requests with no Basic Authentication Header.
 
diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java
index ee5bf03..f7fac9f 100644
--- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java
+++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java
@@ -29,6 +29,7 @@ import org.apache.druid.data.input.impl.StringInputRowParser;
 import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
 import org.apache.druid.guice.Hdfs;
 import org.apache.druid.inputsource.hdfs.HdfsInputSource;
+import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
 import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller;
 import org.apache.druid.utils.CompressionUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -44,21 +45,25 @@ public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<Pa
 {
   private final List<String> inputPaths;
   private final Configuration conf;
+  private final HdfsInputSourceConfig inputSourceConfig;
 
   @JsonCreator
   public HdfsFirehoseFactory(
-      @JacksonInject @Hdfs Configuration conf,
       @JsonProperty("paths") Object inputPaths,
       @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
       @JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes,
       @JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes,
       @JsonProperty("fetchTimeout") Long fetchTimeout,
-      @JsonProperty("maxFetchRetry") Integer maxFetchRetry
+      @JsonProperty("maxFetchRetry") Integer maxFetchRetry,
+      @JacksonInject @Hdfs Configuration conf,
+      @JacksonInject HdfsInputSourceConfig inputSourceConfig
   )
   {
     super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry);
-    this.inputPaths = HdfsInputSource.coerceInputPathsToList(inputPaths, "inputPaths");
+    this.inputPaths = HdfsInputSource.coerceInputPathsToList(inputPaths, "paths");
     this.conf = conf;
+    this.inputSourceConfig = inputSourceConfig;
+    this.inputPaths.forEach(p -> HdfsInputSource.verifyProtocol(conf, inputSourceConfig, p));
   }
 
   @JsonProperty("paths")
@@ -109,21 +114,14 @@ public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<Pa
   public FiniteFirehoseFactory<StringInputRowParser, Path> withSplit(InputSplit<Path> split)
   {
     return new HdfsFirehoseFactory(
-        conf,
         split.get().toString(),
         getMaxCacheCapacityBytes(),
         getMaxFetchCapacityBytes(),
         getPrefetchTriggerBytes(),
         getFetchTimeout(),
-        getMaxFetchRetry()
+        getMaxFetchRetry(),
+        conf,
+        inputSourceConfig
     );
   }
-
-  @Override
-  public String toString()
-  {
-    return "HdfsFirehoseFactory{" +
-           "inputPaths=" + inputPaths +
-           '}';
-  }
 }
diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
index b8c798a..7faebcc 100644
--- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
+++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
@@ -37,6 +37,7 @@ import org.apache.druid.data.input.impl.InputEntityIteratingReader;
 import org.apache.druid.data.input.impl.SplittableInputSource;
 import org.apache.druid.guice.Hdfs;
 import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.utils.Streams;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -64,6 +65,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
 
   private final List<String> inputPaths;
   private final Configuration configuration;
+  private final HdfsInputSourceConfig inputSourceConfig;
 
   // Although the javadocs for SplittableInputSource say to avoid caching splits to reduce memory, HdfsInputSource
   // *does* cache the splits for the following reasons:
@@ -73,32 +75,49 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
   //
   // 2) The index_hadoop task allocates splits eagerly, so the memory usage should not be a problem for anyone
   //    migrating from Hadoop.
-  private List<Path> cachedPaths;
+  @Nullable
+  private List<Path> cachedPaths = null;
 
   @JsonCreator
   public HdfsInputSource(
       @JsonProperty(PROP_PATHS) Object inputPaths,
-      @JacksonInject @Hdfs Configuration configuration
+      @JacksonInject @Hdfs Configuration configuration,
+      @JacksonInject HdfsInputSourceConfig inputSourceConfig
   )
   {
     this.inputPaths = coerceInputPathsToList(inputPaths, PROP_PATHS);
     this.configuration = configuration;
-    this.cachedPaths = null;
+    this.inputSourceConfig = inputSourceConfig;
+    this.inputPaths.forEach(p -> verifyProtocol(configuration, inputSourceConfig, p));
   }
 
   public static List<String> coerceInputPathsToList(Object inputPaths, String propertyName)
   {
-    final List<String> paths;
-
     if (inputPaths instanceof String) {
-      paths = Collections.singletonList((String) inputPaths);
+      return Collections.singletonList((String) inputPaths);
     } else if (inputPaths instanceof List && ((List<?>) inputPaths).stream().allMatch(x -> x instanceof String)) {
-      paths = ((List<?>) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList());
+      return ((List<?>) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList());
     } else {
       throw new IAE("'%s' must be a string or an array of strings", propertyName);
     }
+  }
+
+  public static void verifyProtocol(Configuration conf, HdfsInputSourceConfig config, String pathString)
+  {
+    Path path = new Path(pathString);
+    try {
+      throwIfInvalidProtocol(config, path.getFileSystem(conf).getScheme());
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
 
-    return paths;
+  private static void throwIfInvalidProtocol(HdfsInputSourceConfig config, String scheme)
+  {
+    if (!config.getAllowedProtocols().contains(StringUtils.toLowerCase(scheme))) {
+      throw new IAE("Only %s protocols are allowed", config.getAllowedProtocols());
+    }
   }
 
   public static Collection<Path> getPaths(List<String> inputPaths, Configuration configuration) throws IOException
@@ -202,7 +221,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
   public SplittableInputSource<List<Path>> withSplit(InputSplit<List<Path>> split)
   {
     List<String> paths = split.get().stream().map(path -> path.toString()).collect(Collectors.toList());
-    return new HdfsInputSource(paths, configuration);
+    return new HdfsInputSource(paths, configuration, inputSourceConfig);
   }
 
   @Override
@@ -218,6 +237,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
     }
   }
 
+  @VisibleForTesting
   static Builder builder()
   {
     return new Builder();
@@ -227,6 +247,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
   {
     private Object paths;
     private Configuration configuration;
+    private HdfsInputSourceConfig inputSourceConfig;
 
     private Builder()
     {
@@ -244,9 +265,19 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
       return this;
     }
 
+    Builder inputSourceConfig(HdfsInputSourceConfig inputSourceConfig)
+    {
+      this.inputSourceConfig = inputSourceConfig;
+      return this;
+    }
+
     HdfsInputSource build()
     {
-      return new HdfsInputSource(paths, configuration);
+      return new HdfsInputSource(
+          Preconditions.checkNotNull(paths, "paths"),
+          Preconditions.checkNotNull(configuration, "configuration"),
+          Preconditions.checkNotNull(inputSourceConfig, "inputSourceConfig")
+      );
     }
   }
 }
diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfig.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfig.java
new file mode 100644
index 0000000..c7f43f8
--- /dev/null
+++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfig.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.inputsource.hdfs;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.StringUtils;
+
+import javax.annotation.Nullable;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class HdfsInputSourceConfig
+{
+  static final Set<String> DEFAULT_ALLOWED_PROTOCOLS = ImmutableSet.of("hdfs");
+
+  @JsonProperty
+  private final Set<String> allowedProtocols;
+
+  @JsonCreator
+  public HdfsInputSourceConfig(
+      @JsonProperty("allowedProtocols") @Nullable Set<String> allowedProtocols
+  )
+  {
+    this.allowedProtocols = allowedProtocols == null || allowedProtocols.isEmpty()
+                            ? DEFAULT_ALLOWED_PROTOCOLS
+                            : allowedProtocols.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet());
+  }
+
+  public Set<String> getAllowedProtocols()
+  {
+    return allowedProtocols;
+  }
+}
diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
index e89bb0d..3ca8e23 100644
--- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
+++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
@@ -35,6 +35,7 @@ import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.inputsource.hdfs.HdfsInputSource;
+import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
 import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs;
 import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
 import org.apache.hadoop.conf.Configuration;
@@ -118,5 +119,7 @@ public class HdfsStorageDruidModule implements DruidModule
     JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos", HdfsKerberosConfig.class);
     binder.bind(HdfsStorageAuthentication.class).in(ManageLifecycle.class);
     LifecycleModule.register(binder, HdfsStorageAuthentication.class);
+
+    JsonConfigProvider.bind(binder, "druid.ingestion.hdfs", HdfsInputSourceConfig.class);
   }
 }
diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java
index 88daed7..e96a773 100644
--- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java
+++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java
@@ -19,30 +19,48 @@
 
 package org.apache.druid.firehose.hdfs;
 
-import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.InjectableValues.Std;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
 import org.apache.druid.storage.hdfs.HdfsStorageDruidModule;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
 import java.util.Collections;
 
 public class HdfsFirehoseFactoryTest
 {
+  private static final HdfsInputSourceConfig DEFAULT_INPUT_SOURCE_CONFIG = new HdfsInputSourceConfig(null);
+  private static final Configuration DEFAULT_CONFIGURATION = new Configuration();
+
+  @BeforeClass
+  public static void setup()
+  {
+    DEFAULT_CONFIGURATION.set("fs.default.name", "hdfs://localhost:7020");
+  }
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @Test
   public void testArrayPaths() throws IOException
   {
     final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory(
-        null,
         Collections.singletonList("/foo/bar"),
         null,
         null,
         null,
         null,
-        null
+        null,
+        DEFAULT_CONFIGURATION,
+        DEFAULT_INPUT_SOURCE_CONFIG
     );
 
     final ObjectMapper mapper = createMapper();
@@ -59,7 +77,16 @@ public class HdfsFirehoseFactoryTest
   @Test
   public void testStringPaths() throws IOException
   {
-    final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory(null, "/foo/bar", null, null, null, null, null);
+    final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory(
+        "/foo/bar",
+        null,
+        null,
+        null,
+        null,
+        null,
+        DEFAULT_CONFIGURATION,
+        DEFAULT_INPUT_SOURCE_CONFIG
+    );
     final ObjectMapper mapper = createMapper();
 
     final HdfsFirehoseFactory firehoseFactory2 = (HdfsFirehoseFactory)
@@ -71,11 +98,121 @@ public class HdfsFirehoseFactoryTest
     );
   }
 
+  @Test
+  public void testConstructorAllowsOnlyDefaultProtocol()
+  {
+    new HdfsFirehoseFactory(
+        "hdfs://localhost:7020/foo/bar",
+        null,
+        null,
+        null,
+        null,
+        null,
+        DEFAULT_CONFIGURATION,
+        DEFAULT_INPUT_SOURCE_CONFIG
+    );
+
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("Only [hdfs] protocols are allowed");
+    new HdfsFirehoseFactory(
+        "file:/foo/bar",
+        null,
+        null,
+        null,
+        null,
+        null,
+        DEFAULT_CONFIGURATION,
+        DEFAULT_INPUT_SOURCE_CONFIG
+    );
+  }
+
+  @Test
+  public void testConstructorAllowsOnlyCustomProtocol()
+  {
+    final Configuration conf = new Configuration();
+    conf.set("fs.ftp.impl", "org.apache.hadoop.fs.ftp.FTPFileSystem");
+    new HdfsFirehoseFactory(
+        "ftp://localhost:21/foo/bar",
+        null,
+        null,
+        null,
+        null,
+        null,
+        DEFAULT_CONFIGURATION,
+        new HdfsInputSourceConfig(ImmutableSet.of("ftp"))
+    );
+
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("Only [druid] protocols are allowed");
+    new HdfsFirehoseFactory(
+        "hdfs://localhost:7020/foo/bar",
+        null,
+        null,
+        null,
+        null,
+        null,
+        DEFAULT_CONFIGURATION,
+        new HdfsInputSourceConfig(ImmutableSet.of("druid"))
+    );
+  }
+
+  @Test
+  public void testConstructorWithDefaultHdfs()
+  {
+    new HdfsFirehoseFactory(
+        "/foo/bar*",
+        null,
+        null,
+        null,
+        null,
+        null,
+        DEFAULT_CONFIGURATION,
+        DEFAULT_INPUT_SOURCE_CONFIG
+    );
+
+    new HdfsFirehoseFactory(
+        "foo/bar*",
+        null,
+        null,
+        null,
+        null,
+        null,
+        DEFAULT_CONFIGURATION,
+        DEFAULT_INPUT_SOURCE_CONFIG
+    );
+
+    new HdfsFirehoseFactory(
+        "hdfs:///foo/bar*",
+        null,
+        null,
+        null,
+        null,
+        null,
+        DEFAULT_CONFIGURATION,
+        DEFAULT_INPUT_SOURCE_CONFIG
+    );
+
+    new HdfsFirehoseFactory(
+        "hdfs://localhost:10020/foo/bar*", // different hdfs
+        null,
+        null,
+        null,
+        null,
+        null,
+        DEFAULT_CONFIGURATION,
+        DEFAULT_INPUT_SOURCE_CONFIG
+    );
+  }
+
   private static ObjectMapper createMapper()
   {
     final ObjectMapper mapper = new ObjectMapper();
     new HdfsStorageDruidModule().getJacksonModules().forEach(mapper::registerModule);
-    mapper.setInjectableValues(new InjectableValues.Std().addValue(Configuration.class, new Configuration()));
+    mapper.setInjectableValues(
+        new Std()
+            .addValue(Configuration.class, DEFAULT_CONFIGURATION)
+            .addValue(HdfsInputSourceConfig.class, DEFAULT_INPUT_SOURCE_CONFIG)
+    );
     return mapper;
   }
 }
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfigTest.java
similarity index 51%
copy from core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
copy to extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfigTest.java
index 61be2cc..2e73688 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
+++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfigTest.java
@@ -17,31 +17,32 @@
  * under the License.
  */
 
-package org.apache.druid.data.input.impl;
+package org.apache.druid.inputsource.hdfs;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.data.input.InputSource;
-import org.apache.druid.metadata.DefaultPasswordProvider;
+import com.google.common.collect.ImmutableSet;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.net.URI;
-
-public class HttpInputSourceTest
+public class HdfsInputSourceConfigTest
 {
   @Test
-  public void testSerde() throws IOException
+  public void testNullAllowedProtocolsUseDefault()
+  {
+    HdfsInputSourceConfig config = new HdfsInputSourceConfig(null);
+    Assert.assertEquals(HdfsInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols());
+  }
+
+  @Test
+  public void testEmptyAllowedProtocolsUseDefault()
+  {
+    HdfsInputSourceConfig config = new HdfsInputSourceConfig(ImmutableSet.of());
+    Assert.assertEquals(HdfsInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols());
+  }
+
+  @Test
+  public void testCustomAllowedProtocols()
   {
-    final ObjectMapper mapper = new ObjectMapper();
-    final HttpInputSource source = new HttpInputSource(
-        ImmutableList.of(URI.create("http://test.com/http-test")),
-        "myName",
-        new DefaultPasswordProvider("myPassword")
-    );
-    final byte[] json = mapper.writeValueAsBytes(source);
-    final HttpInputSource fromJson = (HttpInputSource) mapper.readValue(json, InputSource.class);
-    Assert.assertEquals(source, fromJson);
+    HdfsInputSourceConfig config = new HdfsInputSourceConfig(ImmutableSet.of("druid"));
+    Assert.assertEquals(ImmutableSet.of("druid"), config.getAllowedProtocols());
   }
 }
diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
index 170791e..a61a0c6 100644
--- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
+++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
@@ -20,8 +20,9 @@
 package org.apache.druid.inputsource.hdfs;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.InjectableValues.Std;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
@@ -68,8 +69,9 @@ import java.util.stream.IntStream;
 @RunWith(Enclosed.class)
 public class HdfsInputSourceTest extends InitializedNullHandlingTest
 {
-  private static final String PATH = "/foo/bar";
+  private static final String PATH = "hdfs://localhost:7020/foo/bar";
   private static final Configuration CONFIGURATION = new Configuration();
+  private static final HdfsInputSourceConfig DEFAULT_INPUT_SOURCE_CONFIG = new HdfsInputSourceConfig(null);
   private static final String COLUMN = "value";
   private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
       new TimestampSpec(null, null, null),
@@ -84,6 +86,80 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
       0
   );
 
+  public static class ConstructorTest
+  {
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Test
+    public void testConstructorAllowsOnlyDefaultProtocol()
+    {
+      HdfsInputSource.builder()
+                     .paths(PATH + "*")
+                     .configuration(CONFIGURATION)
+                     .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
+                     .build();
+
+      expectedException.expect(IllegalArgumentException.class);
+      expectedException.expectMessage("Only [hdfs] protocols are allowed");
+      HdfsInputSource.builder()
+                     .paths("file:/foo/bar*")
+                     .configuration(CONFIGURATION)
+                     .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
+                     .build();
+    }
+
+    @Test
+    public void testConstructorAllowsOnlyCustomProtocol()
+    {
+      final Configuration conf = new Configuration();
+      conf.set("fs.ftp.impl", "org.apache.hadoop.fs.ftp.FTPFileSystem");
+      HdfsInputSource.builder()
+                     .paths("ftp://localhost:21/foo/bar")
+                     .configuration(CONFIGURATION)
+                     .inputSourceConfig(new HdfsInputSourceConfig(ImmutableSet.of("ftp")))
+                     .build();
+
+      expectedException.expect(IllegalArgumentException.class);
+      expectedException.expectMessage("Only [druid] protocols are allowed");
+      HdfsInputSource.builder()
+                     .paths(PATH + "*")
+                     .configuration(CONFIGURATION)
+                     .inputSourceConfig(new HdfsInputSourceConfig(ImmutableSet.of("druid")))
+                     .build();
+    }
+
+    @Test
+    public void testConstructorWithDefaultHdfs()
+    {
+      final Configuration conf = new Configuration();
+      conf.set("fs.default.name", "hdfs://localhost:7020");
+      HdfsInputSource.builder()
+                     .paths("/foo/bar*")
+                     .configuration(conf)
+                     .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
+                     .build();
+
+      HdfsInputSource.builder()
+                     .paths("foo/bar*")
+                     .configuration(conf)
+                     .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
+                     .build();
+
+      HdfsInputSource.builder()
+                     .paths("hdfs:///foo/bar*")
+                     .configuration(conf)
+                     .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
+                     .build();
+
+      HdfsInputSource.builder()
+                     .paths("hdfs://localhost:10020/foo/bar*") // different hdfs
+                     .configuration(conf)
+                     .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
+                     .build();
+    }
+  }
+
   public static class SerializeDeserializeTest
   {
     private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
@@ -98,7 +174,8 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
     {
       hdfsInputSourceBuilder = HdfsInputSource.builder()
                                               .paths(PATH)
-                                              .configuration(CONFIGURATION);
+                                              .configuration(CONFIGURATION)
+                                              .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG);
     }
 
     @Test
@@ -139,7 +216,11 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
     private static ObjectMapper createObjectMapper()
     {
       final ObjectMapper mapper = new ObjectMapper();
-      mapper.setInjectableValues(new InjectableValues.Std().addValue(Configuration.class, new Configuration()));
+      mapper.setInjectableValues(
+          new Std()
+              .addValue(Configuration.class, new Configuration())
+              .addValue(HdfsInputSourceConfig.class, DEFAULT_INPUT_SOURCE_CONFIG)
+      );
       new HdfsStorageDruidModule().getJacksonModules().forEach(mapper::registerModule);
       return mapper;
     }
@@ -204,6 +285,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
       target = HdfsInputSource.builder()
                               .paths(dfsCluster.getURI() + PATH + "*")
                               .configuration(CONFIGURATION)
+                              .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
                               .build();
     }
 
@@ -304,6 +386,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
       target = HdfsInputSource.builder()
                               .paths(Collections.emptyList())
                               .configuration(CONFIGURATION)
+                              .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
                               .build();
     }
 
diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java
new file mode 100644
index 0000000..31baded
--- /dev/null
+++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.hdfs;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.JsonConfigurator;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.validation.Validation;
+import javax.validation.Validator;
+import java.util.Properties;
+
+public class HdfsStorageDruidModuleTest
+{
+  @Test
+  public void testHdfsInputSourceConfigDefaultAllowedProtocols()
+  {
+    Properties props = new Properties();
+    Injector injector = makeInjectorWithProperties(props);
+    HdfsInputSourceConfig instance = injector.getInstance(HdfsInputSourceConfig.class);
+    Assert.assertEquals(
+        ImmutableSet.of("hdfs"),
+        instance.getAllowedProtocols()
+    );
+  }
+
+  @Test
+  public void testHdfsInputSourceConfigCustomAllowedProtocols()
+  {
+    Properties props = new Properties();
+    props.setProperty("druid.ingestion.hdfs.allowedProtocols", "[\"webhdfs\"]");
+    Injector injector = makeInjectorWithProperties(props);
+    HdfsInputSourceConfig instance = injector.getInstance(HdfsInputSourceConfig.class);
+    Assert.assertEquals(
+        ImmutableSet.of("webhdfs"),
+        instance.getAllowedProtocols()
+    );
+  }
+
+  private Injector makeInjectorWithProperties(final Properties props)
+  {
+    return Guice.createInjector(
+        ImmutableList.of(
+            new DruidGuiceExtensions(),
+            new LifecycleModule(),
+            binder -> {
+              binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
+              binder.bind(JsonConfigurator.class).in(LazySingleton.class);
+              binder.bind(Properties.class).toInstance(props);
+            },
+            new HdfsStorageDruidModule()
+        )
+    );
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java b/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java
index 0423af4..eb612f7 100644
--- a/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java
+++ b/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.common.collect.ImmutableList;
 import com.google.inject.Binder;
+import org.apache.druid.data.input.impl.HttpInputSourceConfig;
+import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.initialization.DruidModule;
 
 import java.util.List;
@@ -47,5 +49,6 @@ public class InputSourceModule implements DruidModule
   @Override
   public void configure(Binder binder)
   {
+    JsonConfigProvider.bind(binder, "druid.ingestion.http", HttpInputSourceConfig.class);
   }
 }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java
index 7ff5e95..bbd797f 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment.realtime.firehose;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
@@ -26,9 +27,10 @@ import com.google.common.base.Predicate;
 import org.apache.druid.data.input.FiniteFirehoseFactory;
 import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.data.input.impl.HttpEntity;
+import org.apache.druid.data.input.impl.HttpInputSource;
+import org.apache.druid.data.input.impl.HttpInputSourceConfig;
 import org.apache.druid.data.input.impl.StringInputRowParser;
 import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.metadata.PasswordProvider;
 import org.apache.druid.utils.CompressionUtils;
 
@@ -43,12 +45,12 @@ import java.util.Objects;
 
 public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<URI>
 {
-  private static final Logger log = new Logger(HttpFirehoseFactory.class);
   private final List<URI> uris;
   @Nullable
   private final String httpAuthenticationUsername;
   @Nullable
   private final PasswordProvider httpAuthenticationPasswordProvider;
+  private final HttpInputSourceConfig inputSourceConfig;
 
   @JsonCreator
   public HttpFirehoseFactory(
@@ -59,14 +61,17 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
       @JsonProperty("fetchTimeout") Long fetchTimeout,
       @JsonProperty("maxFetchRetry") Integer maxFetchRetry,
       @JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername,
-      @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider
+      @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider,
+      @JacksonInject HttpInputSourceConfig inputSourceConfig
   )
   {
     super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry);
     Preconditions.checkArgument(uris.size() > 0, "Empty URIs");
+    HttpInputSource.throwIfInvalidProtocols(inputSourceConfig, uris);
     this.uris = uris;
     this.httpAuthenticationUsername = httpAuthenticationUsername;
     this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider;
+    this.inputSourceConfig = inputSourceConfig;
   }
 
   @Nullable
@@ -120,35 +125,20 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
     if (this == o) {
       return true;
     }
-
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-
-    final HttpFirehoseFactory that = (HttpFirehoseFactory) o;
-    return Objects.equals(uris, that.uris) &&
-           getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() &&
-           getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() &&
-           getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() &&
-           getFetchTimeout() == that.getFetchTimeout() &&
-           getMaxFetchRetry() == that.getMaxFetchRetry() &&
-           Objects.equals(httpAuthenticationUsername, that.getHttpAuthenticationUsername()) &&
-           Objects.equals(httpAuthenticationPasswordProvider, that.getHttpAuthenticationPasswordProvider());
+    HttpFirehoseFactory that = (HttpFirehoseFactory) o;
+    return uris.equals(that.uris) &&
+           Objects.equals(httpAuthenticationUsername, that.httpAuthenticationUsername) &&
+           Objects.equals(httpAuthenticationPasswordProvider, that.httpAuthenticationPasswordProvider) &&
+           inputSourceConfig.equals(that.inputSourceConfig);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(
-        uris,
-        getMaxCacheCapacityBytes(),
-        getMaxFetchCapacityBytes(),
-        getPrefetchTriggerBytes(),
-        getFetchTimeout(),
-        getMaxFetchRetry(),
-        httpAuthenticationUsername,
-        httpAuthenticationPasswordProvider
-    );
+    return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider, inputSourceConfig);
   }
 
   @Override
@@ -168,7 +158,8 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
         getFetchTimeout(),
         getMaxFetchRetry(),
         getHttpAuthenticationUsername(),
-        httpAuthenticationPasswordProvider
+        httpAuthenticationPasswordProvider,
+        inputSourceConfig
     );
   }
 }
diff --git a/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java b/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java
index 67126b0..074b1db 100644
--- a/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java
@@ -25,12 +25,25 @@ import com.fasterxml.jackson.databind.cfg.MapperConfig;
 import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
 import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.druid.data.input.impl.HttpInputSourceConfig;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.JsonConfigurator;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.ServerModule;
+import org.apache.druid.jackson.JacksonModule;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.validation.Validation;
+import javax.validation.Validator;
 import java.util.List;
+import java.util.Properties;
 import java.util.stream.Collectors;
 
 public class InputSourceModuleTest
@@ -59,4 +72,31 @@ public class InputSourceModuleTest
     Assert.assertNotNull(subtypes);
     Assert.assertEquals(SQL_NAMED_TYPE, Iterables.getOnlyElement(subtypes));
   }
+
+  @Test
+  public void testHttpInputSourceDefaultConfig()
+  {
+    Properties props = new Properties();
+    Injector injector = makeInjectorWithProperties(props);
+    HttpInputSourceConfig instance = injector.getInstance(HttpInputSourceConfig.class);
+    Assert.assertEquals(new HttpInputSourceConfig(null), instance);
+    Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, instance.getAllowedProtocols());
+  }
+
+  private Injector makeInjectorWithProperties(final Properties props)
+  {
+    return Guice.createInjector(
+        ImmutableList.of(
+            new DruidGuiceExtensions(),
+            new LifecycleModule(),
+            new ServerModule(),
+            binder -> {
+              binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
+              binder.bind(JsonConfigurator.class).in(LazySingleton.class);
+              binder.bind(Properties.class).toInstance(props);
+            },
+            new JacksonModule(),
+            new InputSourceModule()
+        ));
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java
index e44ff7f..867c497 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java
@@ -19,22 +19,36 @@
 
 package org.apache.druid.segment.realtime.firehose;
 
+import com.fasterxml.jackson.databind.InjectableValues.Std;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.data.input.impl.HttpInputSourceConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.metadata.DefaultPasswordProvider;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
 import java.net.URI;
 
 public class HttpFirehoseFactoryTest
 {
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @Test
   public void testSerde() throws IOException
   {
+    final HttpInputSourceConfig inputSourceConfig = new HttpInputSourceConfig(null);
     final ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.setInjectableValues(new Std().addValue(
+        HttpInputSourceConfig.class,
+        inputSourceConfig
+    ));
+
     final DefaultPasswordProvider pwProvider = new DefaultPasswordProvider("testPassword");
     final HttpFirehoseFactory factory = new HttpFirehoseFactory(
         ImmutableList.of(URI.create("http://foo/bar"), URI.create("http://foo/bar2")),
@@ -44,7 +58,8 @@ public class HttpFirehoseFactoryTest
         100L,
         5,
         "testUser",
-        pwProvider
+        pwProvider,
+        inputSourceConfig
     );
 
     final HttpFirehoseFactory outputFact = mapper.readValue(
@@ -54,4 +69,77 @@ public class HttpFirehoseFactoryTest
 
     Assert.assertEquals(factory, outputFact);
   }
+
+  @Test
+  public void testConstructorAllowsOnlyDefaultProtocols()
+  {
+    new HttpFirehoseFactory(
+        ImmutableList.of(URI.create("http:///")),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        new HttpInputSourceConfig(null)
+    );
+
+    new HttpFirehoseFactory(
+        ImmutableList.of(URI.create("https:///")),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        new HttpInputSourceConfig(null)
+    );
+
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("Only [http, https] protocols are allowed");
+    new HttpFirehoseFactory(
+        ImmutableList.of(URI.create("my-protocol:///")),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        new HttpInputSourceConfig(null)
+    );
+  }
+
+  @Test
+  public void testConstructorAllowsOnlyCustomProtocols()
+  {
+    final HttpInputSourceConfig customConfig = new HttpInputSourceConfig(ImmutableSet.of("druid"));
+    new HttpFirehoseFactory(
+        ImmutableList.of(URI.create("druid:///")),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        customConfig
+    );
+
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("Only [druid] protocols are allowed");
+    new HttpFirehoseFactory(
+        ImmutableList.of(URI.create("https:///")),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        customConfig
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org