You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/07/12 04:43:56 UTC

[incubator-druid] branch master updated: Add inline firehose (#8056)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new da3d141  Add inline firehose (#8056)
da3d141 is described below

commit da3d141dd2c335590223e4dea1db2f307fa70851
Author: Chi Cao Minh <ch...@imply.io>
AuthorDate: Thu Jul 11 21:43:46 2019 -0700

    Add inline firehose (#8056)
    
    * Add inline firehose
    
    To allow users to quickly parsing and schema, add a firehose that reads
    data that is inlined in its spec.
    
    * Address review comments
    
    * Remove suppression of sonar warnings
---
 docs/content/ingestion/firehose.md                 | 125 ++++++-----
 .../org/apache/druid/guice/FirehoseModule.java     |   6 +-
 .../segment/realtime/firehose/InlineFirehose.java  |  99 +++++++++
 .../realtime/firehose/InlineFirehoseFactory.java   |  77 +++++++
 .../segment/realtime/firehose/package-info.java    |  23 ++
 .../org/apache/druid/guice/FirehoseModuleTest.java |  88 ++++++++
 .../firehose/InlineFirehoseFactoryTest.java        | 113 ++++++++++
 .../realtime/firehose/InlineFirehoseTest.java      | 231 +++++++++++++++++++++
 8 files changed, 705 insertions(+), 57 deletions(-)

diff --git a/docs/content/ingestion/firehose.md b/docs/content/ingestion/firehose.md
index 274e3b6..f242bb5 100644
--- a/docs/content/ingestion/firehose.md
+++ b/docs/content/ingestion/firehose.md
@@ -24,33 +24,33 @@ title: "Apache Druid (incubating) Firehoses"
 
 # Apache Druid (incubating) Firehoses
 
-Firehoses are used in [native batch ingestion tasks](../ingestion/native_tasks.html), stream push tasks automatically created by [Tranquility](../ingestion/stream-push.html) ingestion model.
+Firehoses are used in [native batch ingestion tasks](../ingestion/native_tasks.html) and stream push tasks automatically created by [Tranquility](../ingestion/stream-push.html).
 
-They are pluggable and thus the configuration schema can and will vary based on the `type` of the firehose.
+They are pluggable, and thus the configuration schema can and will vary based on the `type` of the Firehose.
 
 | Field | Type | Description | Required |
 |-------|------|-------------|----------|
-| type | String | Specifies the type of firehose. Each value will have its own configuration schema, firehoses packaged with Druid are described below. | yes |
+| type | String | Specifies the type of Firehose. Each value will have its own configuration schema. Firehoses packaged with Druid are described below. | yes |
 
 ## Additional Firehoses
 
-There are several firehoses readily available in Druid, some are meant for examples, others can be used directly in a production environment.
+There are several Firehoses readily available in Druid. Some are meant for examples, and others can be used directly in a production environment.
 
-For additional firehoses, please see our [extensions list](../development/extensions.html).
+For additional Firehoses, please see our [extensions list](../development/extensions.html).
 
 ### LocalFirehose
 
 This Firehose can be used to read the data from files on local disk.
 It can be used for POCs to ingest data on disk.
-This firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task).
-Since each split represents a file in this firehose, each worker task of `index_parallel` will read a file.
-A sample local firehose spec is shown below:
+This Firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task).
+Since each split represents a file in this Firehose, each worker task of `index_parallel` will read a file.
+A sample local Firehose spec is shown below:
 
 ```json
 {
-    "type"    : "local",
-    "filter"   : "*.csv",
-    "baseDir"  : "/data/directory"
+    "type": "local",
+    "filter" : "*.csv",
+    "baseDir": "/data/directory"
 }
 ```
 
@@ -63,14 +63,14 @@ A sample local firehose spec is shown below:
 ### HttpFirehose
 
 This Firehose can be used to read the data from remote sites via HTTP.
-This firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task).
-Since each split represents a file in this firehose, each worker task of `index_parallel` will read a file.
-A sample http firehose spec is shown below:
+This Firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task).
+Since each split represents a file in this Firehose, each worker task of `index_parallel` will read a file.
+A sample HTTP Firehose spec is shown below:
 
 ```json
 {
-    "type"    : "http",
-    "uris"  : ["http://example.com/uri1", "http://example2.com/uri2"]
+    "type": "http",
+    "uris": ["http://example.com/uri1", "http://example2.com/uri2"]
 }
 ```
 
@@ -107,28 +107,28 @@ You can also use the other existing Druid PasswordProviders. Here is an example
 }
 ```
 
-The below configurations can be optionally used for tuning the firehose performance.
+The below configurations can be optionally used for tuning the Firehose performance.
 
 |property|description|default|
 |--------|-----------|-------|
 |maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|
 |maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|
-|prefetchTriggerBytes|Threshold to trigger prefetching http objects.|maxFetchCapacityBytes / 2|
-|fetchTimeout|Timeout for fetching a http object.|60000|
-|maxFetchRetry|Maximum retry for fetching a http object.|3|
+|prefetchTriggerBytes|Threshold to trigger prefetching HTTP objects.|maxFetchCapacityBytes / 2|
+|fetchTimeout|Timeout for fetching an HTTP object.|60000|
+|maxFetchRetry|Maximum retries for fetching an HTTP object.|3|
 
 ### IngestSegmentFirehose
 
 This Firehose can be used to read the data from existing druid segments.
 It can be used to ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment.
-This firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task).
-A sample ingest firehose spec is shown below -
+This Firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task).
+A sample ingest Firehose spec is shown below:
 
 ```json
 {
-    "type"    : "ingestSegment",
-    "dataSource"   : "wikipedia",
-    "interval" : "2013-01-01/2013-01-02"
+    "type": "ingestSegment",
+    "dataSource": "wikipedia",
+    "interval": "2013-01-01/2013-01-02"
 }
 ```
 
@@ -136,15 +136,15 @@ A sample ingest firehose spec is shown below -
 |--------|-----------|---------|
 |type|This should be "ingestSegment".|yes|
 |dataSource|A String defining the data source to fetch rows from, very similar to a table in a relational database|yes|
-|interval|A String representing ISO-8601 Interval. This defines the time range to fetch the data over.|yes|
+|interval|A String representing the ISO-8601 interval. This defines the time range to fetch the data over.|yes|
 |dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no|
 |metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
 |filter| See [Filters](../querying/filters.html)|no|
 |maxInputSegmentBytesPerTask|When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no|
 
-#### SqlFirehose
+### SqlFirehose
 
-SqlFirehoseFactory can be used to ingest events residing in RDBMS. The database connection information is provided as part of the ingestion spec. For each query, the results are fetched locally and indexed. If there are multiple queries from which data needs to be indexed, queries are prefetched in the background upto `maxFetchCapacityBytes` bytes.
+This Firehose can be used to ingest events residing in RDBMS. The database connection information is provided as part of the ingestion spec. For each query, the results are fetched locally and indexed. If there are multiple queries from which data needs to be indexed, queries are prefetched in the background upto `maxFetchCapacityBytes` bytes.
 
 Requires one of the following extensions:
  * [MySQL Metadata Store](../development/extensions-core/mysql.html).
@@ -152,16 +152,16 @@ Requires one of the following extensions:
 
 ```json
 {
-    "type" : "sql",
+    "type": "sql",
     "database": {
         "type": "mysql",
-        "connectorConfig" : {
-            "connectURI" : "jdbc:mysql://host:port/schema",
-            "user" : "user",
-            "password" : "password"
+        "connectorConfig": {
+            "connectURI": "jdbc:mysql://host:port/schema",
+            "user": "user",
+            "password": "password"
         }
      },
-    "sqls" : ["SELECT * FROM table1", "SELECT * FROM table2"]
+    "sqls": ["SELECT * FROM table1", "SELECT * FROM table2"]
 }
 ```
 
@@ -176,39 +176,56 @@ Requires one of the following extensions:
 |foldCase|Toggle case folding of database column names. This may be enabled in cases where the database returns case insensitive column names in query results.|false|No|
 |sqls|List of SQL queries where each SQL query would retrieve the data to be indexed.||Yes|
 
-#### Database
+### Database
 
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|The type of database to query. Valid values are `mysql` and `postgresql`_||Yes|
-|connectorConfig|specify the database connection properties via `connectURI`, `user` and `password`||Yes|
+|connectorConfig|Specify the database connection properties via `connectURI`, `user` and `password`||Yes|
+
+### InlineFirehose
+
+This Firehose can be used to read the data inlined in its own spec.
+It can be used for demos or for quickly testing out parsing and schema.
+A sample inline Firehose spec is shown below:
+
+```json
+{
+    "type": "inline",
+    "data": "0,values,formatted\n1,as,CSV"
+}
+```
 
+|property|description|required?|
+|--------|-----------|---------|
+|type|This should be "inline".|yes|
+|data|Inlined data to ingest.|yes|
 
 ### CombiningFirehose
 
-This firehose can be used to combine and merge data from a list of different firehoses.
-This can be used to merge data from more than one firehose.
+This Firehose can be used to combine and merge data from a list of different Firehoses.
+This can be used to merge data from more than one Firehose.
 
 ```json
 {
-    "type"  :   "combining",
-    "delegates" : [ { firehose1 }, { firehose2 }, ..... ]
+    "type": "combining",
+    "delegates": [ { firehose1 }, { firehose2 }, ... ]
 }
 ```
 
 |property|description|required?|
 |--------|-----------|---------|
 |type|This should be "combining"|yes|
-|delegates|list of firehoses to combine data from|yes|
+|delegates|List of Firehoses to combine data from|yes|
 
 
 ### Streaming Firehoses
 
-The EventReceiverFirehose is used in tasks automatically generated by [Tranquility stream push](../ingestion/stream-push.html). These firehoses are not suitable for batch ingestion.
+The EventReceiverFirehose is used in tasks automatically generated by [Tranquility stream push](../ingestion/stream-push.html). These Firehoses are not suitable for batch ingestion.
 
 #### EventReceiverFirehose
 
-EventReceiverFirehoseFactory can be used to ingest events using an http endpoint.
+This Firehose can be used to ingest events using an HTTP endpoint.
 
 ```json
 {
@@ -217,7 +234,7 @@ EventReceiverFirehoseFactory can be used to ingest events using an http endpoint
   "bufferSize": 10000
 }
 ```
-When using this firehose, events can be sent by submitting a POST request to the http endpoint:
+When using this Firehose, events can be sent by submitting a POST request to the HTTP endpoint:
 
 `http://<peonHost>:<port>/druid/worker/v1/chat/<eventReceiverServiceName>/push-events/`
 
@@ -225,28 +242,28 @@ When using this firehose, events can be sent by submitting a POST request to the
 |--------|-----------|---------|
 |type|This should be "receiver"|yes|
 |serviceName|Name used to announce the event receiver service endpoint|yes|
-|maxIdleTime|A firehose is automatically shut down after not receiving any events for this period of time, in milliseconds. If not specified, a firehose is never shut down due to being idle. Zero and negative values have the same effect.|no|
-|bufferSize|Size of buffer used by firehose to store events|no, default is 100000|
+|maxIdleTime|A Firehose is automatically shut down after not receiving any events for this period of time, in milliseconds. If not specified, a Firehose is never shut down due to being idle. Zero and negative values have the same effect.|no|
+|bufferSize|Size of buffer used by Firehose to store events|no, default is 100000|
 
 Shut down time for EventReceiverFirehose can be specified by submitting a POST request to
 
 `http://<peonHost>:<port>/druid/worker/v1/chat/<eventReceiverServiceName>/shutdown?shutoffTime=<shutoffTime>`
 
-If shutOffTime is not specified, the firehose shuts off immediately.
+If shutOffTime is not specified, the Firehose shuts off immediately.
 
 #### TimedShutoffFirehose
 
-This can be used to start a firehose that will shut down at a specified time.
+This can be used to start a Firehose that will shut down at a specified time.
 An example is shown below:
 
 ```json
 {
-    "type"  :   "timed",
+    "type":  "timed",
     "shutoffTime": "2015-08-25T01:26:05.119Z",
     "delegate": {
-          "type": "receiver",
-          "serviceName": "eventReceiverServiceName",
-          "bufferSize": 100000
+        "type": "receiver",
+        "serviceName": "eventReceiverServiceName",
+        "bufferSize": 100000
      }
 }
 ```
@@ -254,5 +271,5 @@ An example is shown below:
 |property|description|required?|
 |--------|-----------|---------|
 |type|This should be "timed"|yes|
-|shutoffTime|time at which the firehose should shut down, in ISO8601 format|yes|
-|delegate|firehose to use|yes|
+|shutoffTime|Time at which the Firehose should shut down, in ISO8601 format|yes|
+|delegate|Firehose to use|yes|
diff --git a/server/src/main/java/org/apache/druid/guice/FirehoseModule.java b/server/src/main/java/org/apache/druid/guice/FirehoseModule.java
index 1715949..c95b0cd 100644
--- a/server/src/main/java/org/apache/druid/guice/FirehoseModule.java
+++ b/server/src/main/java/org/apache/druid/guice/FirehoseModule.java
@@ -29,6 +29,7 @@ import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory;
 import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
 import org.apache.druid.segment.realtime.firehose.FixedCountFirehoseFactory;
 import org.apache.druid.segment.realtime.firehose.HttpFirehoseFactory;
+import org.apache.druid.segment.realtime.firehose.InlineFirehoseFactory;
 import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
 import org.apache.druid.segment.realtime.firehose.SqlFirehoseFactory;
 import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
@@ -36,8 +37,6 @@ import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
 import java.util.Collections;
 import java.util.List;
 
-/**
- */
 public class FirehoseModule implements DruidModule
 {
   @Override
@@ -58,7 +57,8 @@ public class FirehoseModule implements DruidModule
                 new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
                 new NamedType(CombiningFirehoseFactory.class, "combining"),
                 new NamedType(FixedCountFirehoseFactory.class, "fixedCount"),
-                new NamedType(SqlFirehoseFactory.class, "sql")
+                new NamedType(SqlFirehoseFactory.class, "sql"),
+                new NamedType(InlineFirehoseFactory.class, "inline")
             )
     );
   }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehose.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehose.java
new file mode 100644
index 0000000..188e605
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehose.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.firehose;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowPlusRaw;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.Runnables;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.NoSuchElementException;
+
+/**
+ * Firehose that produces data from its own spec
+ */
+public class InlineFirehose implements Firehose
+{
+  private final StringInputRowParser parser;
+  private final LineIterator lineIterator;
+
+  InlineFirehose(String data, StringInputRowParser parser) throws IOException
+  {
+    this.parser = parser;
+
+    Charset charset = Charset.forName(parser.getEncoding());
+    InputStream stream = new ByteArrayInputStream(data.getBytes(charset));
+    lineIterator = IOUtils.lineIterator(stream, charset);
+  }
+
+  @Override
+  public boolean hasMore()
+  {
+    return lineIterator.hasNext();
+  }
+
+  @Override
+  public InputRow nextRow()
+  {
+    return parser.parse(nextRaw());
+  }
+
+  private String nextRaw()
+  {
+    if (!hasMore()) {
+      throw new NoSuchElementException();
+    }
+
+    return lineIterator.next();
+  }
+
+  @Override
+  public InputRowPlusRaw nextRowWithRaw()
+  {
+    String raw = nextRaw();
+    try {
+      return InputRowPlusRaw.of(parser.parse(raw), StringUtils.toUtf8(raw));
+    }
+    catch (ParseException e) {
+      return InputRowPlusRaw.of(StringUtils.toUtf8(raw), e);
+    }
+  }
+
+  @Override
+  public Runnable commit()
+  {
+    return Runnables.getNoopRunnable();
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    lineIterator.close();
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java
new file mode 100644
index 0000000..d33ba9a
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.firehose;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Creates firehose that produces data inlined in its own spec
+ */
+public class InlineFirehoseFactory implements FirehoseFactory<StringInputRowParser>
+{
+  private final String data;
+
+  @JsonCreator
+  InlineFirehoseFactory(@JsonProperty("data") String data)
+  {
+    this.data = Preconditions.checkNotNull(data, "data");
+  }
+
+  @JsonProperty
+  public String getData()
+  {
+    return data;
+  }
+
+  @Override
+  public Firehose connect(StringInputRowParser parser, @Nullable File temporaryDirectory) throws IOException
+  {
+    return new InlineFirehose(data, parser);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    InlineFirehoseFactory factory = (InlineFirehoseFactory) o;
+    return data.equals(factory.data);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(data);
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/package-info.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/package-info.java
new file mode 100644
index 0000000..e4a0699
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+@EverythingIsNonnullByDefault
+package org.apache.druid.segment.realtime.firehose;
+
+import org.apache.druid.annotations.EverythingIsNonnullByDefault;
diff --git a/server/src/test/java/org/apache/druid/guice/FirehoseModuleTest.java b/server/src/test/java/org/apache/druid/guice/FirehoseModuleTest.java
new file mode 100644
index 0000000..233eb76
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/guice/FirehoseModuleTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.fasterxml.jackson.databind.AnnotationIntrospector;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.cfg.MapperConfig;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.reflect.ClassPath;
+import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.segment.realtime.firehose.ClippedFirehoseFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public class FirehoseModuleTest
+{
+  private static final Predicate<Class> IS_FIREHOSE_FACTORY = FirehoseFactory.class::isAssignableFrom;
+
+  @Test
+  public void testAllFirehoseFactorySubtypesRegistered() throws IOException
+  {
+    ObjectMapper objectMapper = createObjectMapper();
+    Set<Class> registeredSubtypeClasses = getFirehoseFactorySubtypeClasses(objectMapper);
+    String packageName = ClippedFirehoseFactory.class.getPackage().getName();
+    Set<Class> expectedSubtypeClasses = getFirehoseFactoryClassesInPackage(packageName);
+    Assert.assertEquals(expectedSubtypeClasses, registeredSubtypeClasses);
+  }
+
+  private static ObjectMapper createObjectMapper()
+  {
+    ObjectMapper objectMapper = new ObjectMapper();
+    for (Module jacksonModule : new FirehoseModule().getJacksonModules()) {
+      objectMapper.registerModule(jacksonModule);
+    }
+    return objectMapper;
+  }
+
+  private static Set<Class> getFirehoseFactorySubtypeClasses(ObjectMapper objectMapper)
+  {
+    Class parentClass = FirehoseFactory.class;
+    MapperConfig config = objectMapper.getDeserializationConfig();
+    AnnotationIntrospector annotationIntrospector = config.getAnnotationIntrospector();
+    AnnotatedClass ac = AnnotatedClass.constructWithoutSuperTypes(parentClass, annotationIntrospector, config);
+    Collection<NamedType> subtypes = objectMapper.getSubtypeResolver().collectAndResolveSubtypesByClass(config, ac);
+    Assert.assertNotNull(subtypes);
+    return subtypes.stream()
+                   .map(NamedType::getType)
+                   .filter(c -> !c.equals(parentClass))
+                   .collect(Collectors.toSet());
+  }
+
+  @SuppressWarnings("UnstableApiUsage") // for ClassPath
+  private static Set<Class> getFirehoseFactoryClassesInPackage(String packageName) throws IOException
+  {
+    ClassLoader loader = Thread.currentThread().getContextClassLoader();
+    ClassPath classPath = ClassPath.from(loader);
+    return classPath.getTopLevelClasses(packageName).stream()
+                    .map(ClassPath.ClassInfo::load)
+                    .filter(IS_FIREHOSE_FACTORY)
+                    .collect(Collectors.toSet());
+  }
+}
+
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java
new file mode 100644
index 0000000..fe9d797
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseFactoryTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.firehose;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.CSVParseSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+@SuppressWarnings({"NullableProblems", "ConstantConditions"})
+public class InlineFirehoseFactoryTest
+{
+  private static final String DIMENSION_0 = "timestamp";
+  private static final String DIMENSION_1 = "value";
+  private static final List<String> DIMENSIONS = Arrays.asList(DIMENSION_0, DIMENSION_1);
+  private static final String DELIMITER = ",";
+  private static final StringInputRowParser PARSER = new StringInputRowParser(
+      new CSVParseSpec(
+          new TimestampSpec(
+              DIMENSION_0,
+              "auto",
+              null
+          ),
+          new DimensionsSpec(
+              DimensionsSpec.getDefaultSchemas(DIMENSIONS),
+              Collections.emptyList(),
+              Collections.emptyList()
+          ),
+          DELIMITER,
+          DIMENSIONS,
+          false,
+          0
+      ),
+      StandardCharsets.UTF_8.name()
+  );
+  private static final File NO_TEMP_DIR = null;
+  private static final String TIMESTAMP = "0";
+  private static final String VALUE = "a";
+  private static final String DATA = TIMESTAMP + DELIMITER + VALUE;
+
+  private InlineFirehoseFactory target;
+
+  @Before
+  public void setUp()
+  {
+    target = new InlineFirehoseFactory(DATA);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testContstructorDataRequired()
+  {
+    new InlineFirehoseFactory(null);
+  }
+
+  @Test
+  public void testGetData()
+  {
+    Assert.assertEquals(DATA, target.getData());
+  }
+
+  @Test
+  public void testConnect() throws IOException
+  {
+    Firehose firehose = target.connect(PARSER, NO_TEMP_DIR);
+    InputRow row = firehose.nextRow();
+    Assert.assertNotNull(row);
+    List<String> values = row.getDimension(DIMENSION_1);
+    Assert.assertNotNull(values);
+    Assert.assertEquals(1, values.size());
+    Assert.assertEquals(VALUE, values.get(0));
+  }
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    final ObjectMapper objectMapper = new DefaultObjectMapper();
+    InlineFirehoseFactory factory = new InlineFirehoseFactory(DATA);
+    String serialized = objectMapper.writeValueAsString(factory);
+    InlineFirehoseFactory deserialized = objectMapper.readValue(serialized, InlineFirehoseFactory.class);
+    Assert.assertEquals(factory, deserialized);
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseTest.java
new file mode 100644
index 0000000..5e3568b
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.firehose;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowPlusRaw;
+import org.apache.druid.data.input.impl.CSVParseSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.utils.Runnables;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+@SuppressWarnings("ConstantConditions")
+public class InlineFirehoseTest
+{
+  private static final String DIMENSION_0 = "timestamp";
+  private static final String DIMENSION_1 = "value";
+  private static final List<String> DIMENSIONS = Arrays.asList(DIMENSION_0, DIMENSION_1);
+  private static final String DELIMITER = ",";
+  private static final Charset CHARSET = StandardCharsets.UTF_8;
+  private static final StringInputRowParser PARSER = new StringInputRowParser(
+      new CSVParseSpec(
+          new TimestampSpec(
+              DIMENSION_0,
+              "auto",
+              null
+          ),
+          new DimensionsSpec(
+              DimensionsSpec.getDefaultSchemas(DIMENSIONS),
+              Collections.emptyList(),
+              Collections.emptyList()
+          ),
+          DELIMITER,
+          DIMENSIONS,
+          false,
+          0
+      ),
+      CHARSET.name()
+  );
+  private static final String EMPTY = "";
+  private static final String TIMESTAMP_0 = "0";
+  private static final String VALUE_0 = "a";
+  private static final String NOT_EMPTY = TIMESTAMP_0 + DELIMITER + VALUE_0;
+  private static final String PARSEABLE = NOT_EMPTY;
+  private static final String NOT_PARSEABLE = VALUE_0 + DELIMITER + TIMESTAMP_0;
+  private static final String TIMESTAMP_1 = "1";
+  private static final String VALUE_1 = "b";
+  private static final String LINE_0 = TIMESTAMP_0 + DELIMITER + VALUE_0;
+  private static final String LINE_1 = TIMESTAMP_1 + DELIMITER + VALUE_1;
+  private static final String MULTILINE = LINE_0 + "\n" + LINE_1;
+
+  @Test
+  public void testHasMoreEmpty()
+  {
+    InlineFirehose target = create(EMPTY);
+    Assert.assertFalse(target.hasMore());
+  }
+
+  @Test
+  public void testHasMoreNotEmpty()
+  {
+    InlineFirehose target = create(NOT_EMPTY);
+    Assert.assertTrue(target.hasMore());
+  }
+
+  @Test(expected = NoSuchElementException.class)
+  public void testNextRowEmpty()
+  {
+    InlineFirehose target = create(EMPTY);
+    target.nextRow();
+  }
+
+  @Test
+  public void testNextRowNotEmpty()
+  {
+    InlineFirehose target = create(NOT_EMPTY);
+    InputRow row = target.nextRow();
+    assertRowValue(VALUE_0, row);
+  }
+
+  @Test(expected = NoSuchElementException.class)
+  public void testNextRowWithRawEmpty()
+  {
+    InlineFirehose target = create(EMPTY);
+    target.nextRowWithRaw();
+  }
+
+  @Test
+  public void testNextRowWithRawParseable()
+  {
+    final String data = PARSEABLE;
+    InlineFirehose target = create(data);
+    InputRowPlusRaw rowPlusRaw = target.nextRowWithRaw();
+
+    InputRow row = rowPlusRaw.getInputRow();
+    assertRowValue(VALUE_0, row);
+
+    byte[] raw = rowPlusRaw.getRaw();
+    assertRawValue(data, raw);
+
+    Assert.assertNull(rowPlusRaw.getParseException());
+  }
+
+  @Test
+  public void testNextRowWithRawNotParseable()
+  {
+    final String data = NOT_PARSEABLE;
+    InlineFirehose target = create(data);
+    InputRowPlusRaw rowPlusRaw = target.nextRowWithRaw();
+
+    InputRow row = rowPlusRaw.getInputRow();
+    Assert.assertNull(row);
+
+    byte[] raw = rowPlusRaw.getRaw();
+    assertRawValue(data, raw);
+
+    Assert.assertNotNull(rowPlusRaw.getParseException());
+  }
+
+  @Test
+  public void testCommit()
+  {
+    InlineFirehose target = create(NOT_EMPTY);
+    Runnable result = target.commit();
+    Assert.assertSame(Runnables.getNoopRunnable(), result);
+  }
+
+  @Test
+  public void testCloseOpen() throws IOException
+  {
+    InlineFirehose target = create(NOT_EMPTY);
+    target.close();
+    try {
+      target.nextRow();
+      Assert.fail("Should not be able to read from closed firehose");
+    }
+    catch (NoSuchElementException ignored) {
+    }
+  }
+
+  @Test
+  public void testCloseNotOpen()
+  {
+    InlineFirehose target = create(NOT_EMPTY);
+    try {
+      target.close();
+    }
+    catch (IOException e) {
+      Assert.fail("Should be able to close an opened firehose");
+    }
+    try {
+      target.close();
+    }
+    catch (IOException e) {
+      Assert.fail("Should be able to close a closed firehose");
+    }
+  }
+
+  @Test
+  public void testMultiline()
+  {
+    InlineFirehose target = create(MULTILINE);
+
+    // First line
+    Assert.assertTrue(target.hasMore());
+    InputRow row0 = target.nextRow();
+    assertRowValue(VALUE_0, row0);
+
+    // Second line
+    InputRowPlusRaw rowPlusRaw = target.nextRowWithRaw();
+    assertRowValue(VALUE_1, rowPlusRaw.getInputRow());
+    assertRawValue(LINE_1, rowPlusRaw.getRaw());
+    Assert.assertNull(rowPlusRaw.getParseException());
+
+    Assert.assertFalse(target.hasMore());
+  }
+
+  private static InlineFirehose create(String data)
+  {
+    try {
+      return new InlineFirehose(data, PARSER);
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void assertRowValue(String expected, InputRow row)
+  {
+    Assert.assertNotNull(row);
+    List<String> values = row.getDimension(DIMENSION_1);
+    Assert.assertNotNull(values);
+    Assert.assertEquals(1, values.size());
+    Assert.assertEquals(expected, values.get(0));
+  }
+
+  private static void assertRawValue(String expected, byte[] raw)
+  {
+    Assert.assertNotNull(raw);
+    Assert.assertEquals(expected, new String(raw, CHARSET));
+  }
+
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org