You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2021/03/06 19:43:24 UTC
[druid] branch master updated: Add configurations for allowed
protocols for HTTP and HDFS inputSources/firehoses (#10830)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9946306 Add configurations for allowed protocols for HTTP and HDFS inputSources/firehoses (#10830)
9946306 is described below
commit 9946306d4b2c16a7fc8bac97c9f4815ed4b46570
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Sat Mar 6 11:43:00 2021 -0800
Add configurations for allowed protocols for HTTP and HDFS inputSources/firehoses (#10830)
* Allow only HTTP and HTTPS protocols for the HTTP inputSource
* rename
* Update core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
Co-authored-by: Abhishek Agarwal <14...@users.noreply.github.com>
* fix http firehose and update doc
* HDFS inputSource
* add configs for allowed protocols
* fix checkstyle and doc
* more checkstyle
* remove stale doc
* remove more doc
* Apply doc suggestions from code review
Co-authored-by: Charles Smith <38...@users.noreply.github.com>
* update hdfs address in docs
* fix test
Co-authored-by: Abhishek Agarwal <14...@users.noreply.github.com>
Co-authored-by: Charles Smith <38...@users.noreply.github.com>
---
.../druid/data/input/impl/HttpInputSource.java | 32 ++++-
.../data/input/impl/HttpInputSourceConfig.java | 83 ++++++++++++
.../data/input/impl/HttpInputSourceConfigTest.java | 56 ++++++++
.../druid/data/input/impl/HttpInputSourceTest.java | 60 ++++++++-
docs/configuration/index.md | 23 ++++
docs/ingestion/native-batch.md | 28 ++--
.../druid/firehose/hdfs/HdfsFirehoseFactory.java | 24 ++--
.../druid/inputsource/hdfs/HdfsInputSource.java | 51 +++++--
.../inputsource/hdfs/HdfsInputSourceConfig.java | 52 ++++++++
.../druid/storage/hdfs/HdfsStorageDruidModule.java | 3 +
.../firehose/hdfs/HdfsFirehoseFactoryTest.java | 147 ++++++++++++++++++++-
.../hdfs/HdfsInputSourceConfigTest.java | 39 +++---
.../inputsource/hdfs/HdfsInputSourceTest.java | 91 ++++++++++++-
.../storage/hdfs/HdfsStorageDruidModuleTest.java | 80 +++++++++++
.../druid/metadata/input/InputSourceModule.java | 3 +
.../realtime/firehose/HttpFirehoseFactory.java | 41 +++---
.../metadata/input/InputSourceModuleTest.java | 40 ++++++
.../realtime/firehose/HttpFirehoseFactoryTest.java | 90 ++++++++++++-
18 files changed, 850 insertions(+), 93 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
index 21480fd..002d3ec 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java
@@ -19,6 +19,7 @@
package org.apache.druid.data.input.impl;
+import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
@@ -28,6 +29,8 @@ import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.PasswordProvider;
import javax.annotation.Nullable;
@@ -45,18 +48,31 @@ public class HttpInputSource extends AbstractInputSource implements SplittableIn
private final String httpAuthenticationUsername;
@Nullable
private final PasswordProvider httpAuthenticationPasswordProvider;
+ private final HttpInputSourceConfig config;
@JsonCreator
public HttpInputSource(
@JsonProperty("uris") List<URI> uris,
@JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername,
- @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider
+ @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider,
+ @JacksonInject HttpInputSourceConfig config
)
{
Preconditions.checkArgument(uris != null && !uris.isEmpty(), "Empty URIs");
+ throwIfInvalidProtocols(config, uris);
this.uris = uris;
this.httpAuthenticationUsername = httpAuthenticationUsername;
this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider;
+ this.config = config;
+ }
+
+ public static void throwIfInvalidProtocols(HttpInputSourceConfig config, List<URI> uris)
+ {
+ for (URI uri : uris) {
+ if (!config.getAllowedProtocols().contains(StringUtils.toLowerCase(uri.getScheme()))) {
+ throw new IAE("Only %s protocols are allowed", config.getAllowedProtocols());
+ }
+ }
}
@JsonProperty
@@ -97,7 +113,8 @@ public class HttpInputSource extends AbstractInputSource implements SplittableIn
return new HttpInputSource(
Collections.singletonList(split.get()),
httpAuthenticationUsername,
- httpAuthenticationPasswordProvider
+ httpAuthenticationPasswordProvider,
+ config
);
}
@@ -129,16 +146,17 @@ public class HttpInputSource extends AbstractInputSource implements SplittableIn
if (o == null || getClass() != o.getClass()) {
return false;
}
- HttpInputSource source = (HttpInputSource) o;
- return Objects.equals(uris, source.uris) &&
- Objects.equals(httpAuthenticationUsername, source.httpAuthenticationUsername) &&
- Objects.equals(httpAuthenticationPasswordProvider, source.httpAuthenticationPasswordProvider);
+ HttpInputSource that = (HttpInputSource) o;
+ return Objects.equals(uris, that.uris) &&
+ Objects.equals(httpAuthenticationUsername, that.httpAuthenticationUsername) &&
+ Objects.equals(httpAuthenticationPasswordProvider, that.httpAuthenticationPasswordProvider) &&
+ Objects.equals(config, that.config);
}
@Override
public int hashCode()
{
- return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider);
+ return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider, config);
}
@Override
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java
new file mode 100644
index 0000000..40a2521
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.StringUtils;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class HttpInputSourceConfig
+{
+ @VisibleForTesting
+ public static final Set<String> DEFAULT_ALLOWED_PROTOCOLS = ImmutableSet.of("http", "https");
+
+ @JsonProperty
+ private final Set<String> allowedProtocols;
+
+ @JsonCreator
+ public HttpInputSourceConfig(
+ @JsonProperty("allowedProtocols") @Nullable Set<String> allowedProtocols
+ )
+ {
+ this.allowedProtocols = allowedProtocols == null || allowedProtocols.isEmpty()
+ ? DEFAULT_ALLOWED_PROTOCOLS
+ : allowedProtocols.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet());
+ }
+
+ public Set<String> getAllowedProtocols()
+ {
+ return allowedProtocols;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HttpInputSourceConfig that = (HttpInputSourceConfig) o;
+ return Objects.equals(allowedProtocols, that.allowedProtocols);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(allowedProtocols);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "HttpInputSourceConfig{" +
+ ", allowedProtocols=" + allowedProtocols +
+ '}';
+ }
+}
+
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java
new file mode 100644
index 0000000..a3f24b9
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceConfigTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.google.common.collect.ImmutableSet;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HttpInputSourceConfigTest
+{
+
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(HttpInputSourceConfig.class).usingGetClass().verify();
+ }
+
+ @Test
+ public void testNullAllowedProtocolsUseDefault()
+ {
+ HttpInputSourceConfig config = new HttpInputSourceConfig(null);
+ Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols());
+ }
+
+ @Test
+ public void testEmptyAllowedProtocolsUseDefault()
+ {
+ HttpInputSourceConfig config = new HttpInputSourceConfig(ImmutableSet.of());
+ Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols());
+ }
+
+ @Test
+ public void testCustomAllowedProtocols()
+ {
+ HttpInputSourceConfig config = new HttpInputSourceConfig(ImmutableSet.of("druid"));
+ Assert.assertEquals(ImmutableSet.of("druid"), config.getAllowedProtocols());
+ }
+}
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
index 61be2cc..9c17b57 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
+++ b/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
@@ -19,29 +19,87 @@
package org.apache.druid.data.input.impl;
+import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.metadata.DefaultPasswordProvider;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.net.URI;
public class HttpInputSourceTest
{
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
@Test
public void testSerde() throws IOException
{
+ HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(null);
final ObjectMapper mapper = new ObjectMapper();
+ mapper.setInjectableValues(new Std().addValue(HttpInputSourceConfig.class, httpInputSourceConfig));
final HttpInputSource source = new HttpInputSource(
ImmutableList.of(URI.create("http://test.com/http-test")),
"myName",
- new DefaultPasswordProvider("myPassword")
+ new DefaultPasswordProvider("myPassword"),
+ httpInputSourceConfig
);
final byte[] json = mapper.writeValueAsBytes(source);
final HttpInputSource fromJson = (HttpInputSource) mapper.readValue(json, InputSource.class);
Assert.assertEquals(source, fromJson);
}
+
+ @Test
+ public void testConstructorAllowsOnlyDefaultProtocols()
+ {
+ new HttpInputSource(
+ ImmutableList.of(URI.create("http:///")),
+ "myName",
+ new DefaultPasswordProvider("myPassword"),
+ new HttpInputSourceConfig(null)
+ );
+
+ new HttpInputSource(
+ ImmutableList.of(URI.create("https:///")),
+ "myName",
+ new DefaultPasswordProvider("myPassword"),
+ new HttpInputSourceConfig(null)
+ );
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Only [http, https] protocols are allowed");
+ new HttpInputSource(
+ ImmutableList.of(URI.create("my-protocol:///")),
+ "myName",
+ new DefaultPasswordProvider("myPassword"),
+ new HttpInputSourceConfig(null)
+ );
+ }
+
+ @Test
+ public void testConstructorAllowsOnlyCustomProtocols()
+ {
+ final HttpInputSourceConfig customConfig = new HttpInputSourceConfig(ImmutableSet.of("druid"));
+ new HttpInputSource(
+ ImmutableList.of(URI.create("druid:///")),
+ "myName",
+ new DefaultPasswordProvider("myPassword"),
+ customConfig
+ );
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Only [druid] protocols are allowed");
+ new HttpInputSource(
+ ImmutableList.of(URI.create("https:///")),
+ "myName",
+ new DefaultPasswordProvider("myPassword"),
+ customConfig
+ );
+ }
}
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index b23432c..e6650b1 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -515,6 +515,28 @@ This deep storage is used to interface with Cassandra. Note that the `druid-cas
|`druid.storage.keyspace`|Cassandra key space.|none|
+### Ingestion Security Configuration
+
+#### HDFS input source
+
+You can set the following property to specify permissible protocols for
+the [HDFS input source](../ingestion/native-batch.md#hdfs-input-source) and the [HDFS firehose](../ingestion/native-batch.md#hdfsfirehose).
+
+|Property|Possible Values|Description|Default|
+|--------|---------------|-----------|-------|
+|`druid.ingestion.hdfs.allowedProtocols`|List of protocols|Allowed protocols for the HDFS input source and HDFS firehose.|["hdfs"]|
+
+
+#### HTTP input source
+
+You can set the following property to specify permissible protocols for
+the [HTTP input source](../ingestion/native-batch.md#http-input-source) and the [HTTP firehose](../ingestion/native-batch.md#httpfirehose).
+
+|Property|Possible Values|Description|Default|
+|--------|---------------|-----------|-------|
+|`druid.ingestion.http.allowedProtocols`|List of protocols|Allowed protocols for the HTTP input source and HTTP firehose.|["http", "https"]|
+
+
### Task Logging
If you are running the indexing service in remote mode, the task logs must be stored in S3, Azure Blob Store, Google Cloud Storage or HDFS.
@@ -1355,6 +1377,7 @@ The amount of direct memory needed by Druid is at least
ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=<VALUE>` at the command
line.
+
#### Query Configurations
See [general query configuration](#general-query-configuration).
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index cc06475..5032ed1 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -1064,7 +1064,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "hdfs",
- "paths": "hdfs://foo/bar/", "hdfs://bar/foo"
+ "paths": "hdfs://namenode_host/foo/bar/", "hdfs://namenode_host/bar/foo"
},
"inputFormat": {
"type": "json"
@@ -1080,7 +1080,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "hdfs",
- "paths": ["hdfs://foo/bar", "hdfs://bar/foo"]
+ "paths": "hdfs://namenode_host/foo/bar/", "hdfs://namenode_host/bar/foo"
},
"inputFormat": {
"type": "json"
@@ -1096,7 +1096,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "hdfs",
- "paths": "hdfs://foo/bar/file.json", "hdfs://bar/foo/file2.json"
+ "paths": "hdfs://namenode_host/foo/bar/file.json", "hdfs://namenode_host/bar/foo/file2.json"
},
"inputFormat": {
"type": "json"
@@ -1112,7 +1112,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "hdfs",
- "paths": ["hdfs://foo/bar/file.json", "hdfs://bar/foo/file2.json"]
+ "paths": ["hdfs://namenode_host/foo/bar/file.json", "hdfs://namenode_host/bar/foo/file2.json"]
},
"inputFormat": {
"type": "json"
@@ -1127,9 +1127,10 @@ Sample specs:
|type|This should be `hdfs`.|None|yes|
|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths. Empty files located under one of the given paths will be skipped.|None|yes|
-You can also ingest from cloud storage using the HDFS input source.
-However, if you want to read from AWS S3 or Google Cloud Storage, consider using
-the [S3 input source](#s3-input-source) or the [Google Cloud Storage input source](#google-cloud-storage-input-source) instead.
+You can also ingest from other storage using the HDFS input source if the HDFS client supports that storage.
+However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage.
+If you want to use a non-hdfs protocol with the HDFS input source, include the protocol
+in `druid.ingestion.hdfs.allowedProtocols`. See [HDFS input source security configuration](../configuration/index.md#hdfs-input-source) for more details.
### HTTP Input Source
@@ -1209,10 +1210,13 @@ You can also use the other existing Druid PasswordProviders. Here is an example
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `http`|None|yes|
-|uris|URIs of the input files.|None|yes|
+|uris|URIs of the input files. See below for the protocols allowed for URIs.|None|yes|
|httpAuthenticationUsername|Username to use for authentication with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no|
|httpAuthenticationPassword|PasswordProvider to use with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no|
+You can only use protocols listed in the `druid.ingestion.http.allowedProtocols` property as HTTP input sources.
+The `http` and `https` protocols are allowed by default. See [HTTP input source security configuration](../configuration/index.md#http-input-source) for more details.
+
### Inline Input Source
The Inline input source can be used to read the data inlined in its own spec.
@@ -1559,6 +1563,11 @@ Note that prefetching or caching isn't that useful in the Parallel task.
|fetchTimeout|Timeout for fetching each file.|60000|
|maxFetchRetry|Maximum number of retries for fetching each file.|3|
+You can also ingest from other storage using the HDFS firehose if the HDFS client supports that storage.
+However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage.
+If you want to use a non-hdfs protocol with the HDFS firehose, you need to include the protocol you want
+in `druid.ingestion.hdfs.allowedProtocols`. See [HDFS firehose security configuration](../configuration/index.md#hdfs-input-source) for more details.
+
### LocalFirehose
This Firehose can be used to read the data from files on local disk, and is mainly intended for proof-of-concept testing, and works with `string` typed parsers.
@@ -1596,6 +1605,9 @@ A sample HTTP Firehose spec is shown below:
}
```
+You can only use protocols listed in the `druid.ingestion.http.allowedProtocols` property as HTTP firehose input sources.
+The `http` and `https` protocols are allowed by default. See [HTTP firehose security configuration](../configuration/index.md#http-input-source) for more details.
+
The below configurations can be optionally used if the URIs specified in the spec require a Basic Authentication Header.
Omitting these fields from your spec will result in HTTP requests with no Basic Authentication Header.
diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java
index ee5bf03..f7fac9f 100644
--- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java
+++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java
@@ -29,6 +29,7 @@ import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.inputsource.hdfs.HdfsInputSource;
+import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller;
import org.apache.druid.utils.CompressionUtils;
import org.apache.hadoop.conf.Configuration;
@@ -44,21 +45,25 @@ public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<Pa
{
private final List<String> inputPaths;
private final Configuration conf;
+ private final HdfsInputSourceConfig inputSourceConfig;
@JsonCreator
public HdfsFirehoseFactory(
- @JacksonInject @Hdfs Configuration conf,
@JsonProperty("paths") Object inputPaths,
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
@JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes,
@JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes,
@JsonProperty("fetchTimeout") Long fetchTimeout,
- @JsonProperty("maxFetchRetry") Integer maxFetchRetry
+ @JsonProperty("maxFetchRetry") Integer maxFetchRetry,
+ @JacksonInject @Hdfs Configuration conf,
+ @JacksonInject HdfsInputSourceConfig inputSourceConfig
)
{
super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry);
- this.inputPaths = HdfsInputSource.coerceInputPathsToList(inputPaths, "inputPaths");
+ this.inputPaths = HdfsInputSource.coerceInputPathsToList(inputPaths, "paths");
this.conf = conf;
+ this.inputSourceConfig = inputSourceConfig;
+ this.inputPaths.forEach(p -> HdfsInputSource.verifyProtocol(conf, inputSourceConfig, p));
}
@JsonProperty("paths")
@@ -109,21 +114,14 @@ public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<Pa
public FiniteFirehoseFactory<StringInputRowParser, Path> withSplit(InputSplit<Path> split)
{
return new HdfsFirehoseFactory(
- conf,
split.get().toString(),
getMaxCacheCapacityBytes(),
getMaxFetchCapacityBytes(),
getPrefetchTriggerBytes(),
getFetchTimeout(),
- getMaxFetchRetry()
+ getMaxFetchRetry(),
+ conf,
+ inputSourceConfig
);
}
-
- @Override
- public String toString()
- {
- return "HdfsFirehoseFactory{" +
- "inputPaths=" + inputPaths +
- '}';
- }
}
diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
index b8c798a..7faebcc 100644
--- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
+++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
@@ -37,6 +37,7 @@ import org.apache.druid.data.input.impl.InputEntityIteratingReader;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.utils.Streams;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -64,6 +65,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
private final List<String> inputPaths;
private final Configuration configuration;
+ private final HdfsInputSourceConfig inputSourceConfig;
// Although the javadocs for SplittableInputSource say to avoid caching splits to reduce memory, HdfsInputSource
// *does* cache the splits for the following reasons:
@@ -73,32 +75,49 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
//
// 2) The index_hadoop task allocates splits eagerly, so the memory usage should not be a problem for anyone
// migrating from Hadoop.
- private List<Path> cachedPaths;
+ @Nullable
+ private List<Path> cachedPaths = null;
@JsonCreator
public HdfsInputSource(
@JsonProperty(PROP_PATHS) Object inputPaths,
- @JacksonInject @Hdfs Configuration configuration
+ @JacksonInject @Hdfs Configuration configuration,
+ @JacksonInject HdfsInputSourceConfig inputSourceConfig
)
{
this.inputPaths = coerceInputPathsToList(inputPaths, PROP_PATHS);
this.configuration = configuration;
- this.cachedPaths = null;
+ this.inputSourceConfig = inputSourceConfig;
+ this.inputPaths.forEach(p -> verifyProtocol(configuration, inputSourceConfig, p));
}
public static List<String> coerceInputPathsToList(Object inputPaths, String propertyName)
{
- final List<String> paths;
-
if (inputPaths instanceof String) {
- paths = Collections.singletonList((String) inputPaths);
+ return Collections.singletonList((String) inputPaths);
} else if (inputPaths instanceof List && ((List<?>) inputPaths).stream().allMatch(x -> x instanceof String)) {
- paths = ((List<?>) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList());
+ return ((List<?>) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList());
} else {
throw new IAE("'%s' must be a string or an array of strings", propertyName);
}
+ }
+
+ public static void verifyProtocol(Configuration conf, HdfsInputSourceConfig config, String pathString)
+ {
+ Path path = new Path(pathString);
+ try {
+ throwIfInvalidProtocol(config, path.getFileSystem(conf).getScheme());
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
- return paths;
+ private static void throwIfInvalidProtocol(HdfsInputSourceConfig config, String scheme)
+ {
+ if (!config.getAllowedProtocols().contains(StringUtils.toLowerCase(scheme))) {
+ throw new IAE("Only %s protocols are allowed", config.getAllowedProtocols());
+ }
}
public static Collection<Path> getPaths(List<String> inputPaths, Configuration configuration) throws IOException
@@ -202,7 +221,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
public SplittableInputSource<List<Path>> withSplit(InputSplit<List<Path>> split)
{
List<String> paths = split.get().stream().map(path -> path.toString()).collect(Collectors.toList());
- return new HdfsInputSource(paths, configuration);
+ return new HdfsInputSource(paths, configuration, inputSourceConfig);
}
@Override
@@ -218,6 +237,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
}
}
+ @VisibleForTesting
static Builder builder()
{
return new Builder();
@@ -227,6 +247,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
{
private Object paths;
private Configuration configuration;
+ private HdfsInputSourceConfig inputSourceConfig;
private Builder()
{
@@ -244,9 +265,19 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
return this;
}
+ Builder inputSourceConfig(HdfsInputSourceConfig inputSourceConfig)
+ {
+ this.inputSourceConfig = inputSourceConfig;
+ return this;
+ }
+
HdfsInputSource build()
{
- return new HdfsInputSource(paths, configuration);
+ return new HdfsInputSource(
+ Preconditions.checkNotNull(paths, "paths"),
+ Preconditions.checkNotNull(configuration, "configuration"),
+ Preconditions.checkNotNull(inputSourceConfig, "inputSourceConfig")
+ );
}
}
}
diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfig.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfig.java
new file mode 100644
index 0000000..c7f43f8
--- /dev/null
+++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfig.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.inputsource.hdfs;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.StringUtils;
+
+import javax.annotation.Nullable;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class HdfsInputSourceConfig
+{
+ static final Set<String> DEFAULT_ALLOWED_PROTOCOLS = ImmutableSet.of("hdfs");
+
+ @JsonProperty
+ private final Set<String> allowedProtocols;
+
+ @JsonCreator
+ public HdfsInputSourceConfig(
+ @JsonProperty("allowedProtocols") @Nullable Set<String> allowedProtocols
+ )
+ {
+ this.allowedProtocols = allowedProtocols == null || allowedProtocols.isEmpty()
+ ? DEFAULT_ALLOWED_PROTOCOLS
+ : allowedProtocols.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet());
+ }
+
+ public Set<String> getAllowedProtocols()
+ {
+ return allowedProtocols;
+ }
+}
diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
index e89bb0d..3ca8e23 100644
--- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
+++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
@@ -35,6 +35,7 @@ import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.inputsource.hdfs.HdfsInputSource;
+import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs;
import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
import org.apache.hadoop.conf.Configuration;
@@ -118,5 +119,7 @@ public class HdfsStorageDruidModule implements DruidModule
JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos", HdfsKerberosConfig.class);
binder.bind(HdfsStorageAuthentication.class).in(ManageLifecycle.class);
LifecycleModule.register(binder, HdfsStorageAuthentication.class);
+
+ JsonConfigProvider.bind(binder, "druid.ingestion.hdfs", HdfsInputSourceConfig.class);
}
}
diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java
index 88daed7..e96a773 100644
--- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java
+++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java
@@ -19,30 +19,48 @@
package org.apache.druid.firehose.hdfs;
-import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
import org.apache.druid.storage.hdfs.HdfsStorageDruidModule;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.Collections;
public class HdfsFirehoseFactoryTest
{
+ private static final HdfsInputSourceConfig DEFAULT_INPUT_SOURCE_CONFIG = new HdfsInputSourceConfig(null);
+ private static final Configuration DEFAULT_CONFIGURATION = new Configuration();
+
+ @BeforeClass
+ public static void setup()
+ {
+ DEFAULT_CONFIGURATION.set("fs.default.name", "hdfs://localhost:7020");
+ }
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
@Test
public void testArrayPaths() throws IOException
{
final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory(
- null,
Collections.singletonList("/foo/bar"),
null,
null,
null,
null,
- null
+ null,
+ DEFAULT_CONFIGURATION,
+ DEFAULT_INPUT_SOURCE_CONFIG
);
final ObjectMapper mapper = createMapper();
@@ -59,7 +77,16 @@ public class HdfsFirehoseFactoryTest
@Test
public void testStringPaths() throws IOException
{
- final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory(null, "/foo/bar", null, null, null, null, null);
+ final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory(
+ "/foo/bar",
+ null,
+ null,
+ null,
+ null,
+ null,
+ DEFAULT_CONFIGURATION,
+ DEFAULT_INPUT_SOURCE_CONFIG
+ );
final ObjectMapper mapper = createMapper();
final HdfsFirehoseFactory firehoseFactory2 = (HdfsFirehoseFactory)
@@ -71,11 +98,121 @@ public class HdfsFirehoseFactoryTest
);
}
+ @Test
+ public void testConstructorAllowsOnlyDefaultProtocol()
+ {
+ new HdfsFirehoseFactory(
+ "hdfs://localhost:7020/foo/bar",
+ null,
+ null,
+ null,
+ null,
+ null,
+ DEFAULT_CONFIGURATION,
+ DEFAULT_INPUT_SOURCE_CONFIG
+ );
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Only [hdfs] protocols are allowed");
+ new HdfsFirehoseFactory(
+ "file:/foo/bar",
+ null,
+ null,
+ null,
+ null,
+ null,
+ DEFAULT_CONFIGURATION,
+ DEFAULT_INPUT_SOURCE_CONFIG
+ );
+ }
+
+ @Test
+ public void testConstructorAllowsOnlyCustomProtocol()
+ {
+ final Configuration conf = new Configuration();
+ conf.set("fs.ftp.impl", "org.apache.hadoop.fs.ftp.FTPFileSystem");
+ new HdfsFirehoseFactory(
+ "ftp://localhost:21/foo/bar",
+ null,
+ null,
+ null,
+ null,
+ null,
+ DEFAULT_CONFIGURATION,
+ new HdfsInputSourceConfig(ImmutableSet.of("ftp"))
+ );
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Only [druid] protocols are allowed");
+ new HdfsFirehoseFactory(
+ "hdfs://localhost:7020/foo/bar",
+ null,
+ null,
+ null,
+ null,
+ null,
+ DEFAULT_CONFIGURATION,
+ new HdfsInputSourceConfig(ImmutableSet.of("druid"))
+ );
+ }
+
+ @Test
+ public void testConstructorWithDefaultHdfs()
+ {
+ new HdfsFirehoseFactory(
+ "/foo/bar*",
+ null,
+ null,
+ null,
+ null,
+ null,
+ DEFAULT_CONFIGURATION,
+ DEFAULT_INPUT_SOURCE_CONFIG
+ );
+
+ new HdfsFirehoseFactory(
+ "foo/bar*",
+ null,
+ null,
+ null,
+ null,
+ null,
+ DEFAULT_CONFIGURATION,
+ DEFAULT_INPUT_SOURCE_CONFIG
+ );
+
+ new HdfsFirehoseFactory(
+ "hdfs:///foo/bar*",
+ null,
+ null,
+ null,
+ null,
+ null,
+ DEFAULT_CONFIGURATION,
+ DEFAULT_INPUT_SOURCE_CONFIG
+ );
+
+ new HdfsFirehoseFactory(
+ "hdfs://localhost:10020/foo/bar*", // different hdfs
+ null,
+ null,
+ null,
+ null,
+ null,
+ DEFAULT_CONFIGURATION,
+ DEFAULT_INPUT_SOURCE_CONFIG
+ );
+ }
+
private static ObjectMapper createMapper()
{
final ObjectMapper mapper = new ObjectMapper();
new HdfsStorageDruidModule().getJacksonModules().forEach(mapper::registerModule);
- mapper.setInjectableValues(new InjectableValues.Std().addValue(Configuration.class, new Configuration()));
+ mapper.setInjectableValues(
+ new Std()
+ .addValue(Configuration.class, DEFAULT_CONFIGURATION)
+ .addValue(HdfsInputSourceConfig.class, DEFAULT_INPUT_SOURCE_CONFIG)
+ );
return mapper;
}
}
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfigTest.java
similarity index 51%
copy from core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
copy to extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfigTest.java
index 61be2cc..2e73688 100644
--- a/core/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java
+++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceConfigTest.java
@@ -17,31 +17,32 @@
* under the License.
*/
-package org.apache.druid.data.input.impl;
+package org.apache.druid.inputsource.hdfs;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.data.input.InputSource;
-import org.apache.druid.metadata.DefaultPasswordProvider;
+import com.google.common.collect.ImmutableSet;
import org.junit.Assert;
import org.junit.Test;
-import java.io.IOException;
-import java.net.URI;
-
-public class HttpInputSourceTest
+public class HdfsInputSourceConfigTest
{
@Test
- public void testSerde() throws IOException
+ public void testNullAllowedProtocolsUseDefault()
+ {
+ HdfsInputSourceConfig config = new HdfsInputSourceConfig(null);
+ Assert.assertEquals(HdfsInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols());
+ }
+
+ @Test
+ public void testEmptyAllowedProtocolsUseDefault()
+ {
+ HdfsInputSourceConfig config = new HdfsInputSourceConfig(ImmutableSet.of());
+ Assert.assertEquals(HdfsInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols());
+ }
+
+ @Test
+ public void testCustomAllowedProtocols()
{
- final ObjectMapper mapper = new ObjectMapper();
- final HttpInputSource source = new HttpInputSource(
- ImmutableList.of(URI.create("http://test.com/http-test")),
- "myName",
- new DefaultPasswordProvider("myPassword")
- );
- final byte[] json = mapper.writeValueAsBytes(source);
- final HttpInputSource fromJson = (HttpInputSource) mapper.readValue(json, InputSource.class);
- Assert.assertEquals(source, fromJson);
+ HdfsInputSourceConfig config = new HdfsInputSourceConfig(ImmutableSet.of("druid"));
+ Assert.assertEquals(ImmutableSet.of("druid"), config.getAllowedProtocols());
}
}
diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
index 170791e..a61a0c6 100644
--- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
+++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
@@ -20,8 +20,9 @@
package org.apache.druid.inputsource.hdfs;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
@@ -68,8 +69,9 @@ import java.util.stream.IntStream;
@RunWith(Enclosed.class)
public class HdfsInputSourceTest extends InitializedNullHandlingTest
{
- private static final String PATH = "/foo/bar";
+ private static final String PATH = "hdfs://localhost:7020/foo/bar";
private static final Configuration CONFIGURATION = new Configuration();
+ private static final HdfsInputSourceConfig DEFAULT_INPUT_SOURCE_CONFIG = new HdfsInputSourceConfig(null);
private static final String COLUMN = "value";
private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
new TimestampSpec(null, null, null),
@@ -84,6 +86,80 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
0
);
+ public static class ConstructorTest
+ {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testConstructorAllowsOnlyDefaultProtocol()
+ {
+ HdfsInputSource.builder()
+ .paths(PATH + "*")
+ .configuration(CONFIGURATION)
+ .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
+ .build();
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Only [hdfs] protocols are allowed");
+ HdfsInputSource.builder()
+ .paths("file:/foo/bar*")
+ .configuration(CONFIGURATION)
+ .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
+ .build();
+ }
+
+ @Test
+ public void testConstructorAllowsOnlyCustomProtocol()
+ {
+ final Configuration conf = new Configuration();
+ conf.set("fs.ftp.impl", "org.apache.hadoop.fs.ftp.FTPFileSystem");
+ HdfsInputSource.builder()
+ .paths("ftp://localhost:21/foo/bar")
+ .configuration(CONFIGURATION)
+ .inputSourceConfig(new HdfsInputSourceConfig(ImmutableSet.of("ftp")))
+ .build();
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Only [druid] protocols are allowed");
+ HdfsInputSource.builder()
+ .paths(PATH + "*")
+ .configuration(CONFIGURATION)
+ .inputSourceConfig(new HdfsInputSourceConfig(ImmutableSet.of("druid")))
+ .build();
+ }
+
+ @Test
+ public void testConstructorWithDefaultHdfs()
+ {
+ final Configuration conf = new Configuration();
+ conf.set("fs.default.name", "hdfs://localhost:7020");
+ HdfsInputSource.builder()
+ .paths("/foo/bar*")
+ .configuration(conf)
+ .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
+ .build();
+
+ HdfsInputSource.builder()
+ .paths("foo/bar*")
+ .configuration(conf)
+ .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
+ .build();
+
+ HdfsInputSource.builder()
+ .paths("hdfs:///foo/bar*")
+ .configuration(conf)
+ .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
+ .build();
+
+ HdfsInputSource.builder()
+ .paths("hdfs://localhost:10020/foo/bar*") // different hdfs
+ .configuration(conf)
+ .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
+ .build();
+ }
+ }
+
public static class SerializeDeserializeTest
{
private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
@@ -98,7 +174,8 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
{
hdfsInputSourceBuilder = HdfsInputSource.builder()
.paths(PATH)
- .configuration(CONFIGURATION);
+ .configuration(CONFIGURATION)
+ .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG);
}
@Test
@@ -139,7 +216,11 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
private static ObjectMapper createObjectMapper()
{
final ObjectMapper mapper = new ObjectMapper();
- mapper.setInjectableValues(new InjectableValues.Std().addValue(Configuration.class, new Configuration()));
+ mapper.setInjectableValues(
+ new Std()
+ .addValue(Configuration.class, new Configuration())
+ .addValue(HdfsInputSourceConfig.class, DEFAULT_INPUT_SOURCE_CONFIG)
+ );
new HdfsStorageDruidModule().getJacksonModules().forEach(mapper::registerModule);
return mapper;
}
@@ -204,6 +285,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
target = HdfsInputSource.builder()
.paths(dfsCluster.getURI() + PATH + "*")
.configuration(CONFIGURATION)
+ .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
.build();
}
@@ -304,6 +386,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
target = HdfsInputSource.builder()
.paths(Collections.emptyList())
.configuration(CONFIGURATION)
+ .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
.build();
}
diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java
new file mode 100644
index 0000000..31baded
--- /dev/null
+++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.hdfs;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.JsonConfigurator;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.validation.Validation;
+import javax.validation.Validator;
+import java.util.Properties;
+
+public class HdfsStorageDruidModuleTest
+{
+ @Test
+ public void testHdfsInputSourceConfigDefaultAllowedProtocols()
+ {
+ Properties props = new Properties();
+ Injector injector = makeInjectorWithProperties(props);
+ HdfsInputSourceConfig instance = injector.getInstance(HdfsInputSourceConfig.class);
+ Assert.assertEquals(
+ ImmutableSet.of("hdfs"),
+ instance.getAllowedProtocols()
+ );
+ }
+
+ @Test
+ public void testHdfsInputSourceConfigCustomAllowedProtocols()
+ {
+ Properties props = new Properties();
+ props.setProperty("druid.ingestion.hdfs.allowedProtocols", "[\"webhdfs\"]");
+ Injector injector = makeInjectorWithProperties(props);
+ HdfsInputSourceConfig instance = injector.getInstance(HdfsInputSourceConfig.class);
+ Assert.assertEquals(
+ ImmutableSet.of("webhdfs"),
+ instance.getAllowedProtocols()
+ );
+ }
+
+ private Injector makeInjectorWithProperties(final Properties props)
+ {
+ return Guice.createInjector(
+ ImmutableList.of(
+ new DruidGuiceExtensions(),
+ new LifecycleModule(),
+ binder -> {
+ binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
+ binder.bind(JsonConfigurator.class).in(LazySingleton.class);
+ binder.bind(Properties.class).toInstance(props);
+ },
+ new HdfsStorageDruidModule()
+ )
+ );
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java b/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java
index 0423af4..eb612f7 100644
--- a/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java
+++ b/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
+import org.apache.druid.data.input.impl.HttpInputSourceConfig;
+import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.initialization.DruidModule;
import java.util.List;
@@ -47,5 +49,6 @@ public class InputSourceModule implements DruidModule
@Override
public void configure(Binder binder)
{
+ JsonConfigProvider.bind(binder, "druid.ingestion.http", HttpInputSourceConfig.class);
}
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java
index 7ff5e95..bbd797f 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment.realtime.firehose;
+import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
@@ -26,9 +27,10 @@ import com.google.common.base.Predicate;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.HttpEntity;
+import org.apache.druid.data.input.impl.HttpInputSource;
+import org.apache.druid.data.input.impl.HttpInputSourceConfig;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
-import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.utils.CompressionUtils;
@@ -43,12 +45,12 @@ import java.util.Objects;
public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<URI>
{
- private static final Logger log = new Logger(HttpFirehoseFactory.class);
private final List<URI> uris;
@Nullable
private final String httpAuthenticationUsername;
@Nullable
private final PasswordProvider httpAuthenticationPasswordProvider;
+ private final HttpInputSourceConfig inputSourceConfig;
@JsonCreator
public HttpFirehoseFactory(
@@ -59,14 +61,17 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
@JsonProperty("fetchTimeout") Long fetchTimeout,
@JsonProperty("maxFetchRetry") Integer maxFetchRetry,
@JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername,
- @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider
+ @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider,
+ @JacksonInject HttpInputSourceConfig inputSourceConfig
)
{
super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry);
Preconditions.checkArgument(uris.size() > 0, "Empty URIs");
+ HttpInputSource.throwIfInvalidProtocols(inputSourceConfig, uris);
this.uris = uris;
this.httpAuthenticationUsername = httpAuthenticationUsername;
this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider;
+ this.inputSourceConfig = inputSourceConfig;
}
@Nullable
@@ -120,35 +125,20 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
if (this == o) {
return true;
}
-
if (o == null || getClass() != o.getClass()) {
return false;
}
-
- final HttpFirehoseFactory that = (HttpFirehoseFactory) o;
- return Objects.equals(uris, that.uris) &&
- getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() &&
- getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() &&
- getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() &&
- getFetchTimeout() == that.getFetchTimeout() &&
- getMaxFetchRetry() == that.getMaxFetchRetry() &&
- Objects.equals(httpAuthenticationUsername, that.getHttpAuthenticationUsername()) &&
- Objects.equals(httpAuthenticationPasswordProvider, that.getHttpAuthenticationPasswordProvider());
+ HttpFirehoseFactory that = (HttpFirehoseFactory) o;
+ return uris.equals(that.uris) &&
+ Objects.equals(httpAuthenticationUsername, that.httpAuthenticationUsername) &&
+ Objects.equals(httpAuthenticationPasswordProvider, that.httpAuthenticationPasswordProvider) &&
+ inputSourceConfig.equals(that.inputSourceConfig);
}
@Override
public int hashCode()
{
- return Objects.hash(
- uris,
- getMaxCacheCapacityBytes(),
- getMaxFetchCapacityBytes(),
- getPrefetchTriggerBytes(),
- getFetchTimeout(),
- getMaxFetchRetry(),
- httpAuthenticationUsername,
- httpAuthenticationPasswordProvider
- );
+ return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider, inputSourceConfig);
}
@Override
@@ -168,7 +158,8 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
getFetchTimeout(),
getMaxFetchRetry(),
getHttpAuthenticationUsername(),
- httpAuthenticationPasswordProvider
+ httpAuthenticationPasswordProvider,
+ inputSourceConfig
);
}
}
diff --git a/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java b/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java
index 67126b0..074b1db 100644
--- a/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/input/InputSourceModuleTest.java
@@ -25,12 +25,25 @@ import com.fasterxml.jackson.databind.cfg.MapperConfig;
import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.druid.data.input.impl.HttpInputSourceConfig;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.JsonConfigurator;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.ServerModule;
+import org.apache.druid.jackson.JacksonModule;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import javax.validation.Validation;
+import javax.validation.Validator;
import java.util.List;
+import java.util.Properties;
import java.util.stream.Collectors;
public class InputSourceModuleTest
@@ -59,4 +72,31 @@ public class InputSourceModuleTest
Assert.assertNotNull(subtypes);
Assert.assertEquals(SQL_NAMED_TYPE, Iterables.getOnlyElement(subtypes));
}
+
+ @Test
+ public void testHttpInputSourceDefaultConfig()
+ {
+ Properties props = new Properties();
+ Injector injector = makeInjectorWithProperties(props);
+ HttpInputSourceConfig instance = injector.getInstance(HttpInputSourceConfig.class);
+ Assert.assertEquals(new HttpInputSourceConfig(null), instance);
+ Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, instance.getAllowedProtocols());
+ }
+
+ private Injector makeInjectorWithProperties(final Properties props)
+ {
+ return Guice.createInjector(
+ ImmutableList.of(
+ new DruidGuiceExtensions(),
+ new LifecycleModule(),
+ new ServerModule(),
+ binder -> {
+ binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
+ binder.bind(JsonConfigurator.class).in(LazySingleton.class);
+ binder.bind(Properties.class).toInstance(props);
+ },
+ new JacksonModule(),
+ new InputSourceModule()
+ ));
+ }
}
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java
index e44ff7f..867c497 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java
@@ -19,22 +19,36 @@
package org.apache.druid.segment.realtime.firehose;
+import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.data.input.impl.HttpInputSourceConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.metadata.DefaultPasswordProvider;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.net.URI;
public class HttpFirehoseFactoryTest
{
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
@Test
public void testSerde() throws IOException
{
+ final HttpInputSourceConfig inputSourceConfig = new HttpInputSourceConfig(null);
final ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(new Std().addValue(
+ HttpInputSourceConfig.class,
+ inputSourceConfig
+ ));
+
final DefaultPasswordProvider pwProvider = new DefaultPasswordProvider("testPassword");
final HttpFirehoseFactory factory = new HttpFirehoseFactory(
ImmutableList.of(URI.create("http://foo/bar"), URI.create("http://foo/bar2")),
@@ -44,7 +58,8 @@ public class HttpFirehoseFactoryTest
100L,
5,
"testUser",
- pwProvider
+ pwProvider,
+ inputSourceConfig
);
final HttpFirehoseFactory outputFact = mapper.readValue(
@@ -54,4 +69,77 @@ public class HttpFirehoseFactoryTest
Assert.assertEquals(factory, outputFact);
}
+
+ @Test
+ public void testConstructorAllowsOnlyDefaultProtocols()
+ {
+ new HttpFirehoseFactory(
+ ImmutableList.of(URI.create("http:///")),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ new HttpInputSourceConfig(null)
+ );
+
+ new HttpFirehoseFactory(
+ ImmutableList.of(URI.create("https:///")),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ new HttpInputSourceConfig(null)
+ );
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Only [http, https] protocols are allowed");
+ new HttpFirehoseFactory(
+ ImmutableList.of(URI.create("my-protocol:///")),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ new HttpInputSourceConfig(null)
+ );
+ }
+
+ @Test
+ public void testConstructorAllowsOnlyCustomProtocols()
+ {
+ final HttpInputSourceConfig customConfig = new HttpInputSourceConfig(ImmutableSet.of("druid"));
+ new HttpFirehoseFactory(
+ ImmutableList.of(URI.create("druid:///")),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ customConfig
+ );
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Only [druid] protocols are allowed");
+ new HttpFirehoseFactory(
+ ImmutableList.of(URI.create("https:///")),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ customConfig
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org