You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2021/04/13 04:54:10 UTC

[druid] branch master updated: add avro stream input format (#11040)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d0a94a8  add avro stream input format (#11040)
d0a94a8 is described below

commit d0a94a8c141c4aa4e666c3d34d5fc099b80480a0
Author: Yi Yuan <26...@qq.com>
AuthorDate: Tue Apr 13 12:53:41 2021 +0800

    add avro stream input format (#11040)
    
    * add avro stream input format
    
    * bug fixed
    
    * add document
    
    * doc fix
    
    * change doc
    
    * add integretion test
    
    * bug fixed
    
    * bug fixed
    
    * add string as binary getter
    
    Co-authored-by: yuanyi <yu...@freewheel.tv>
---
 docs/ingestion/data-formats.md                     | 349 ++++++++++++---------
 extensions-core/avro-extensions/pom.xml            |   5 +
 .../data/input/avro/AvroExtensionsModule.java      |   3 +-
 .../data/input/avro/AvroStreamInputFormat.java     | 103 ++++++
 .../druid/data/input/avro/AvroStreamReader.java    |  88 ++++++
 .../avro/SchemaRegistryBasedAvroBytesDecoder.java  |  63 +++-
 .../data/input/AvroStreamInputFormatTest.java      | 232 ++++++++++++++
 .../data/avro/input_format/input_format.json       |  81 +++++
 .../input_format/input_format.json                 |  15 +
 web-console/src/druid-models/input-format.tsx      |   6 +-
 10 files changed, 785 insertions(+), 160 deletions(-)

diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md
index 96f5d92..b65a1db 100644
--- a/docs/ingestion/data-formats.md
+++ b/docs/ingestion/data-formats.md
@@ -223,6 +223,203 @@ The Parquet `inputFormat` has the following components:
 |flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Parquet file. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) |
 | binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
 
+### Avro Stream
+
+> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro Stream input format.
+
+> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid
+
+The `inputFormat` to load data of Avro format in stream ingestion. An example is:
+```json
+"ioConfig": {
+  "inputFormat": {
+    "type": "avro_stream",
+    "avroBytesDecoder": {
+      "type": "schema_inline",
+      "schema": {
+        //your schema goes here, for example
+        "namespace": "org.apache.druid.data",
+        "name": "User",
+        "type": "record",
+        "fields": [
+          { "name": "FullName", "type": "string" },
+          { "name": "Country", "type": "string" }
+        ]
+      }
+    },
+    "flattenSpec": {
+      "useFieldDiscovery": true,
+      "fields": [
+        {
+          "type": "path",
+          "name": "someRecord_subInt",
+          "expr": "$.someRecord.subInt"
+        }
+      ]
+    },
+    "binaryAsString": false
+  },
+  ...
+}
+```
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+|type| String| This should be set to `avro_stream` to read Avro serialized data| yes |
+|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Avro record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) |
+|`avroBytesDecoder`| JSON Object |Specifies how to decode bytes to Avro record. | yes |
+| binaryAsString | Boolean | Specifies if the bytes Avro column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
+
+##### Avro Bytes Decoder
+
+If `type` is not included, the avroBytesDecoder defaults to `schema_repo`.
+
+###### Inline Schema Based Avro Bytes Decoder
+
+> The "schema_inline" decoder reads Avro records using a fixed schema and does not support schema migration. If you
+> may need to migrate schemas in the future, consider one of the other decoders, all of which use a message header that
+> allows the parser to identify the proper Avro schema for reading records.
+
+This decoder can be used if all the input events can be read using the same schema. In this case, specify the schema in the input task JSON itself, as described below.
+
+```
+...
+"avroBytesDecoder": {
+  "type": "schema_inline",
+  "schema": {
+    //your schema goes here, for example
+    "namespace": "org.apache.druid.data",
+    "name": "User",
+    "type": "record",
+    "fields": [
+      { "name": "FullName", "type": "string" },
+      { "name": "Country", "type": "string" }
+    ]
+  }
+}
+...
+```
+
+###### Multiple Inline Schemas Based Avro Bytes Decoder
+
+Use this decoder if different input events can have different read schemas. In this case, specify the schema in the input task JSON itself, as described below.
+
+```
+...
+"avroBytesDecoder": {
+  "type": "multiple_schemas_inline",
+  "schemas": {
+    //your id -> schema map goes here, for example
+    "1": {
+      "namespace": "org.apache.druid.data",
+      "name": "User",
+      "type": "record",
+      "fields": [
+        { "name": "FullName", "type": "string" },
+        { "name": "Country", "type": "string" }
+      ]
+    },
+    "2": {
+      "namespace": "org.apache.druid.otherdata",
+      "name": "UserIdentity",
+      "type": "record",
+      "fields": [
+        { "name": "Name", "type": "string" },
+        { "name": "Location", "type": "string" }
+      ]
+    },
+    ...
+    ...
+  }
+}
+...
+```
+
+Note that it is essentially a map of integer schema ID to avro schema object. This parser assumes that record has following format.
+  first 1 byte is version and must always be 1.
+  next 4 bytes are integer schema ID serialized using big-endian byte order.
+  remaining bytes contain serialized avro message.
+
+##### SchemaRepo Based Avro Bytes Decoder
+
+This Avro bytes decoder first extracts `subject` and `id` from the input message bytes, and then uses them to look up the Avro schema used to decode the Avro record from bytes. For details, see the [schema repo](https://github.com/schema-repo/schema-repo) and [AVRO-1124](https://issues.apache.org/jira/browse/AVRO-1124). You will need an http service like schema repo to hold the avro schema. For information on registering a schema on the message producer side, see `org.apache.druid.data.i [...]
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| type | String | This should say `schema_repo`. | no |
+| subjectAndIdConverter | JSON Object | Specifies how to extract the subject and id from message bytes. | yes |
+| schemaRepository | JSON Object | Specifies how to look up the Avro schema from subject and id. | yes |
+
+###### Avro-1124 Subject And Id Converter
+
+This section describes the format of the `subjectAndIdConverter` object for the `schema_repo` Avro bytes decoder.
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| type | String | This should say `avro_1124`. | no |
+| topic | String | Specifies the topic of your Kafka stream. | yes |
+
+
+###### Avro-1124 Schema Repository
+
+This section describes the format of the `schemaRepository` object for the `schema_repo` Avro bytes decoder.
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| type | String | This should say `avro_1124_rest_client`. | no |
+| url | String | Specifies the endpoint url of your Avro-1124 schema repository. | yes |
+
+###### Confluent Schema Registry-based Avro Bytes Decoder
+
+This Avro bytes decoder first extracts a unique `id` from input message bytes, and then uses it to look up the schema in the Schema Registry used to decode the Avro record from bytes.
+For details, see the Schema Registry [documentation](http://docs.confluent.io/current/schema-registry/docs/) and [repository](https://github.com/confluentinc/schema-registry).
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| type | String | This should say `schema_registry`. | no |
+| url | String | Specifies the url endpoint of the Schema Registry. | yes |
+| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no |
+| urls | Array<String> | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) |
+| config | Json | To send additional configurations, configured for Schema Registry | no |
+| headers | Json | To send headers to the Schema Registry | no |
+
+For a single schema registry instance, use Field `url` or `urls` for multi instances.
+
+Single Instance:
+```json
+...
+"avroBytesDecoder" : {
+   "type" : "schema_registry",
+   "url" : <schema-registry-url>
+}
+...
+```
+
+Multiple Instances:
+```json
+...
+"avroBytesDecoder" : {
+   "type" : "schema_registry",
+   "urls" : [<schema-registry-url-1>, <schema-registry-url-2>, ...],
+   "config" : {
+        "basic.auth.credentials.source": "USER_INFO",
+        "basic.auth.user.info": "fred:letmein",
+        "schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks",
+        "schema.registry.ssl.truststore.password": "<password>",
+        "schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks",
+        "schema.registry.ssl.keystore.password": "<password>",
+        "schema.registry.ssl.key.password": "<password>"
+       ... 
+   },
+   "headers": {
+       "traceID" : "b29c5de2-0db4-490b-b421",
+       "timeStamp" : "1577191871865",
+       ...
+    }
+}
+...
+```
+
 ### Avro OCF
 
 > You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro OCF input format.
@@ -876,7 +1073,7 @@ This parser is for [stream ingestion](./index.md#streaming) and reads Avro data
 | Field | Type | Description | Required |
 |-------|------|-------------|----------|
 | type | String | This should say `avro_stream`. | no |
-| avroBytesDecoder | JSON Object | Specifies how to decode bytes to Avro record. | yes |
+| avroBytesDecoder | JSON Object | Specifies [`avroBytesDecoder`](#Avro Bytes Decoder) to decode bytes to Avro record. | yes |
 | parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be an "avro" parseSpec. | yes |
 
 An Avro parseSpec can contain a [`flattenSpec`](#flattenspec) using either the "root" or "path"
@@ -907,156 +1104,6 @@ For example, using Avro stream parser with schema repo Avro bytes decoder:
 }
 ```
 
-#### Avro Bytes Decoder
-
-If `type` is not included, the avroBytesDecoder defaults to `schema_repo`.
-
-##### Inline Schema Based Avro Bytes Decoder
-
-> The "schema_inline" decoder reads Avro records using a fixed schema and does not support schema migration. If you
-> may need to migrate schemas in the future, consider one of the other decoders, all of which use a message header that
-> allows the parser to identify the proper Avro schema for reading records.
-
-This decoder can be used if all the input events can be read using the same schema. In this case, specify the schema in the input task JSON itself, as described below.
-
-```
-...
-"avroBytesDecoder": {
-  "type": "schema_inline",
-  "schema": {
-    //your schema goes here, for example
-    "namespace": "org.apache.druid.data",
-    "name": "User",
-    "type": "record",
-    "fields": [
-      { "name": "FullName", "type": "string" },
-      { "name": "Country", "type": "string" }
-    ]
-  }
-}
-...
-```
-
-##### Multiple Inline Schemas Based Avro Bytes Decoder
-
-Use this decoder if different input events can have different read schemas. In this case, specify the schema in the input task JSON itself, as described below.
-
-```
-...
-"avroBytesDecoder": {
-  "type": "multiple_schemas_inline",
-  "schemas": {
-    //your id -> schema map goes here, for example
-    "1": {
-      "namespace": "org.apache.druid.data",
-      "name": "User",
-      "type": "record",
-      "fields": [
-        { "name": "FullName", "type": "string" },
-        { "name": "Country", "type": "string" }
-      ]
-    },
-    "2": {
-      "namespace": "org.apache.druid.otherdata",
-      "name": "UserIdentity",
-      "type": "record",
-      "fields": [
-        { "name": "Name", "type": "string" },
-        { "name": "Location", "type": "string" }
-      ]
-    },
-    ...
-    ...
-  }
-}
-...
-```
-
-Note that it is essentially a map of integer schema ID to avro schema object. This parser assumes that record has following format.
-  first 1 byte is version and must always be 1.
-  next 4 bytes are integer schema ID serialized using big-endian byte order.
-  remaining bytes contain serialized avro message.
-
-##### SchemaRepo Based Avro Bytes Decoder
-
-This Avro bytes decoder first extracts `subject` and `id` from the input message bytes, and then uses them to look up the Avro schema used to decode the Avro record from bytes. For details, see the [schema repo](https://github.com/schema-repo/schema-repo) and [AVRO-1124](https://issues.apache.org/jira/browse/AVRO-1124). You will need an http service like schema repo to hold the avro schema. For information on registering a schema on the message producer side, see `org.apache.druid.data.i [...]
-
-| Field | Type | Description | Required |
-|-------|------|-------------|----------|
-| type | String | This should say `schema_repo`. | no |
-| subjectAndIdConverter | JSON Object | Specifies how to extract the subject and id from message bytes. | yes |
-| schemaRepository | JSON Object | Specifies how to look up the Avro schema from subject and id. | yes |
-
-###### Avro-1124 Subject And Id Converter
-
-This section describes the format of the `subjectAndIdConverter` object for the `schema_repo` Avro bytes decoder.
-
-| Field | Type | Description | Required |
-|-------|------|-------------|----------|
-| type | String | This should say `avro_1124`. | no |
-| topic | String | Specifies the topic of your Kafka stream. | yes |
-
-
-###### Avro-1124 Schema Repository
-
-This section describes the format of the `schemaRepository` object for the `schema_repo` Avro bytes decoder.
-
-| Field | Type | Description | Required |
-|-------|------|-------------|----------|
-| type | String | This should say `avro_1124_rest_client`. | no |
-| url | String | Specifies the endpoint url of your Avro-1124 schema repository. | yes |
-
-##### Confluent Schema Registry-based Avro Bytes Decoder
-
-This Avro bytes decoder first extracts a unique `id` from input message bytes, and then uses it to look up the schema in the Schema Registry used to decode the Avro record from bytes.
-For details, see the Schema Registry [documentation](http://docs.confluent.io/current/schema-registry/docs/) and [repository](https://github.com/confluentinc/schema-registry).
-
-| Field | Type | Description | Required |
-|-------|------|-------------|----------|
-| type | String | This should say `schema_registry`. | no |
-| url | String | Specifies the url endpoint of the Schema Registry. | yes |
-| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no |
-| urls | Array<String> | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) |
-| config | Json | To send additional configurations, configured for Schema Registry | no |
-| headers | Json | To send headers to the Schema Registry | no |
-
-For a single schema registry instance, use Field `url` or `urls` for multi instances.
-
-Single Instance:
-```json
-...
-"avroBytesDecoder" : {
-   "type" : "schema_registry",
-   "url" : <schema-registry-url>
-}
-...
-```
-
-Multiple Instances:
-```json
-...
-"avroBytesDecoder" : {
-   "type" : "schema_registry",
-   "urls" : [<schema-registry-url-1>, <schema-registry-url-2>, ...],
-   "config" : {
-        "basic.auth.credentials.source": "USER_INFO",
-        "basic.auth.user.info": "fred:letmein",
-        "schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks",
-        "schema.registry.ssl.truststore.password": "<password>",
-        "schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks",
-        "schema.registry.ssl.keystore.password": "<password>",
-        "schema.registry.ssl.key.password": "<password>"
-       ... 
-   },
-   "headers": {
-       "traceID" : "b29c5de2-0db4-490b-b421",
-       "timeStamp" : "1577191871865",
-       ...
-    }
-}
-...
-```
-
 ### Protobuf Parser
 
 > You need to include the [`druid-protobuf-extensions`](../development/extensions-core/protobuf.md) as an extension to use the Protobuf Parser.
diff --git a/extensions-core/avro-extensions/pom.xml b/extensions-core/avro-extensions/pom.xml
index b07c4f4..adb64bf 100644
--- a/extensions-core/avro-extensions/pom.xml
+++ b/extensions-core/avro-extensions/pom.xml
@@ -116,6 +116,11 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
       <groupId>org.schemarepo</groupId>
       <artifactId>schema-repo-api</artifactId>
       <version>${schemarepo.version}</version>
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroExtensionsModule.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroExtensionsModule.java
index 29f7ea0..296c6da 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroExtensionsModule.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroExtensionsModule.java
@@ -55,7 +55,8 @@ public class AvroExtensionsModule implements DruidModule
                 new NamedType(AvroStreamInputRowParser.class, "avro_stream"),
                 new NamedType(AvroHadoopInputRowParser.class, "avro_hadoop"),
                 new NamedType(AvroParseSpec.class, "avro"),
-                new NamedType(AvroOCFInputFormat.class, "avro_ocf")
+                new NamedType(AvroOCFInputFormat.class, "avro_ocf"),
+                new NamedType(AvroStreamInputFormat.class, "avro_stream")
             )
             .setMixInAnnotation(Repository.class, RepositoryMixIn.class)
             .setMixInAnnotation(JsonUtil.class, JsonUtilMixIn.class)
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java
new file mode 100644
index 0000000..3b59b36
--- /dev/null
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.avro;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.NestedInputFormat;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Objects;
+
+public class AvroStreamInputFormat extends NestedInputFormat
+{
+  private final boolean binaryAsString;
+
+  private final AvroBytesDecoder avroBytesDecoder;
+
+  @JsonCreator
+  public AvroStreamInputFormat(
+      @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
+      @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder,
+      @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString
+  )
+  {
+    super(flattenSpec);
+    this.avroBytesDecoder = avroBytesDecoder;
+    this.binaryAsString = binaryAsString == null ? false : binaryAsString;
+  }
+
+  @Override
+  public boolean isSplittable()
+  {
+    return false;
+  }
+
+  @JsonProperty
+  public AvroBytesDecoder getAvroBytesDecoder()
+  {
+    return avroBytesDecoder;
+  }
+
+  @JsonProperty
+  public Boolean getBinaryAsString()
+  {
+    return binaryAsString;
+  }
+
+  @Override
+  public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
+  {
+    return new AvroStreamReader(
+        inputRowSchema,
+        source,
+        avroBytesDecoder,
+        getFlattenSpec(),
+        binaryAsString
+    );
+  }
+
+  @Override
+  public boolean equals(final Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final AvroStreamInputFormat that = (AvroStreamInputFormat) o;
+    return Objects.equals(getFlattenSpec(), that.getFlattenSpec()) &&
+        Objects.equals(avroBytesDecoder, that.avroBytesDecoder) &&
+        Objects.equals(binaryAsString, that.binaryAsString);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(getFlattenSpec(), avroBytesDecoder, binaryAsString);
+  }
+}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
new file mode 100644
index 0000000..d8d92eb
--- /dev/null
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.avro;
+
+import com.google.common.collect.Iterators;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.common.parsers.ObjectFlattener;
+import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
+import org.apache.druid.java.util.common.parsers.ParseException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class AvroStreamReader extends IntermediateRowParsingReader<GenericRecord>
+{
+  private final InputRowSchema inputRowSchema;
+  private final InputEntity source;
+  private final AvroBytesDecoder avroBytesDecoder;
+  private final ObjectFlattener<GenericRecord> recordFlattener;
+
+  AvroStreamReader(
+      InputRowSchema inputRowSchema,
+      InputEntity source,
+      AvroBytesDecoder avroBytesDecoder,
+      JSONPathSpec flattenSpec,
+      boolean binaryAsString
+  )
+  {
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
+    this.avroBytesDecoder = avroBytesDecoder;
+    this.recordFlattener = ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(false, binaryAsString));
+  }
+
+  @Override
+  protected CloseableIterator<GenericRecord> intermediateRowIterator() throws IOException
+  {
+    return CloseableIterators.withEmptyBaggage(
+        Iterators.singletonIterator(avroBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open()))))
+    );
+  }
+
+  @Override
+  protected List<InputRow> parseInputRows(GenericRecord intermediateRow) throws ParseException
+  {
+    return Collections.singletonList(
+        MapInputRowParser.parse(
+            inputRowSchema,
+            recordFlattener.flatten(intermediateRow)
+        )
+    );
+  }
+
+  @Override
+  protected List<Map<String, Object>> toMap(GenericRecord intermediateRow)
+  {
+    return Collections.singletonList(recordFlattener.toMap(intermediateRow));
+  }
+}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
index 123f8fa..4b3da38 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
@@ -45,6 +45,11 @@ import java.util.Objects;
 public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
 {
   private final SchemaRegistryClient registry;
+  private final String url;
+  private final int capacity;
+  private final List<String> urls;
+  private final Map<String, ?> config;
+  private final Map<String, String> headers;
 
   @JsonCreator
   public SchemaRegistryBasedAvroBytesDecoder(
@@ -55,18 +60,57 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
       @JsonProperty("headers") @Nullable Map<String, String> headers
   )
   {
-    int identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity;
+    this.url = url;
+    this.capacity = capacity == null ? Integer.MAX_VALUE : capacity;
+    this.urls = urls;
+    this.config = config;
+    this.headers = headers;
     if (url != null && !url.isEmpty()) {
-      this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity, config, headers);
+      this.registry = new CachedSchemaRegistryClient(this.url, this.capacity, this.config, this.headers);
     } else {
-      this.registry = new CachedSchemaRegistryClient(urls, identityMapCapacity, config, headers);
+      this.registry = new CachedSchemaRegistryClient(this.urls, this.capacity, this.config, this.headers);
     }
   }
 
+  @JsonProperty
+  public String getUrl()
+  {
+    return url;
+  }
+
+  @JsonProperty
+  public int getCapacity()
+  {
+    return capacity;
+  }
+
+  @JsonProperty
+  public List<String> getUrls()
+  {
+    return urls;
+  }
+
+  @JsonProperty
+  public Map<String, ?> getConfig()
+  {
+    return config;
+  }
+
+  @JsonProperty
+  public Map<String, String> getHeaders()
+  {
+    return headers;
+  }
+
   //For UT only
   @VisibleForTesting
   SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry)
   {
+    this.url = null;
+    this.capacity = Integer.MAX_VALUE;
+    this.urls = null;
+    this.config = null;
+    this.headers = null;
     this.registry = registry;
   }
 
@@ -114,12 +158,21 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
 
     SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o;
 
-    return Objects.equals(registry, that.registry);
+    return Objects.equals(url, that.url) &&
+        Objects.equals(capacity, that.capacity) &&
+        Objects.equals(urls, that.urls) &&
+        Objects.equals(config, that.config) &&
+        Objects.equals(headers, that.headers);
   }
 
   @Override
   public int hashCode()
   {
-    return registry != null ? registry.hashCode() : 0;
+    int result = url != null ? url.hashCode() : 0;
+    result = 31 * result + capacity;
+    result = 31 * result + (urls != null ? urls.hashCode() : 0);
+    result = 31 * result + (config != null ? config.hashCode() : 0);
+    result = 31 * result + (headers != null ? headers.hashCode() : 0);
+    return result;
   }
 }
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
new file mode 100644
index 0000000..139616b
--- /dev/null
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.druid.data.input.avro.AvroExtensionsModule;
+import org.apache.druid.data.input.avro.AvroStreamInputFormat;
+import org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder;
+import org.apache.druid.data.input.avro.SchemaRepoBasedAvroBytesDecoder;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.NestedInputFormat;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.data.input.schemarepo.Avro1124RESTRepositoryClientWrapper;
+import org.apache.druid.data.input.schemarepo.Avro1124SubjectAndIdConverter;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.schemarepo.InMemoryRepository;
+import org.schemarepo.Repository;
+import org.schemarepo.SchemaValidationException;
+import org.schemarepo.api.TypedSchemaRepository;
+import org.schemarepo.api.converter.AvroSchemaConverter;
+import org.schemarepo.api.converter.IdentityConverter;
+import org.schemarepo.api.converter.IntegerConverter;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.druid.data.input.AvroStreamInputRowParserTest.assertInputRowCorrect;
+import static org.apache.druid.data.input.AvroStreamInputRowParserTest.buildSomeAvroDatum;
+
+public class AvroStreamInputFormatTest
+{
+
+  private static final String EVENT_TYPE = "eventType";
+  private static final String ID = "id";
+  private static final String SOME_OTHER_ID = "someOtherId";
+  private static final String IS_VALID = "isValid";
+  private static final String TOPIC = "aTopic";
+  static final List<String> DIMENSIONS = Arrays.asList(EVENT_TYPE, ID, SOME_OTHER_ID, IS_VALID);
+  private static final List<String> DIMENSIONS_SCHEMALESS = Arrays.asList(
+      "nested",
+      SOME_OTHER_ID,
+      "someStringArray",
+      "someIntArray",
+      "someFloat",
+      EVENT_TYPE,
+      "someFixed",
+      "someBytes",
+      "someUnion",
+      ID,
+      "someEnum",
+      "someLong",
+      "someInt",
+      "timestamp"
+  );
+
+
+  private final ObjectMapper jsonMapper = new DefaultObjectMapper();
+
+  private TimestampSpec timestampSpec;
+  private DimensionsSpec dimensionsSpec;
+  private JSONPathSpec flattenSpec;
+
+  @Before
+  public void before()
+  {
+    timestampSpec = new TimestampSpec("nested", "millis", null);
+    dimensionsSpec = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(DIMENSIONS), Collections.emptyList(), null);
+    flattenSpec = new JSONPathSpec(
+      true,
+      ImmutableList.of(
+          new JSONPathFieldSpec(JSONPathFieldType.PATH, "nested", "someRecord.subLong")
+      )
+  );
+    for (Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) {
+      jsonMapper.registerModule(jacksonModule);
+    }
+  }
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io");
+    AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
+        flattenSpec,
+        new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+        false
+    );
+    NestedInputFormat inputFormat2 = jsonMapper.readValue(
+        jsonMapper.writeValueAsString(inputFormat),
+        NestedInputFormat.class
+    );
+
+    Assert.assertEquals(inputFormat, inputFormat2);
+  }
+
+  @Test
+  public void testSerdeForSchemaRegistry() throws IOException
+  {
+    AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
+        flattenSpec,
+        new SchemaRegistryBasedAvroBytesDecoder("http://test:8081", 100, null, null, null),
+        false
+    );
+    NestedInputFormat inputFormat2 = jsonMapper.readValue(
+        jsonMapper.writeValueAsString(inputFormat),
+        NestedInputFormat.class
+    );
+    Assert.assertEquals(inputFormat, inputFormat2);
+  }
+
+  @Test
+  public void testParse() throws SchemaValidationException, IOException
+  {
+    Repository repository = new InMemoryRepository(null);
+    AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
+        flattenSpec,
+        new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+        false
+    );
+    NestedInputFormat inputFormat2 = jsonMapper.readValue(
+        jsonMapper.writeValueAsString(inputFormat),
+        NestedInputFormat.class
+    );
+    repository = ((SchemaRepoBasedAvroBytesDecoder) ((AvroStreamInputFormat) inputFormat2).getAvroBytesDecoder()).getSchemaRepository();
+
+    // prepare data
+    GenericRecord someAvroDatum = buildSomeAvroDatum();
+
+    // encode schema id
+    Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC);
+    TypedSchemaRepository<Integer, Schema, String> repositoryClient = new TypedSchemaRepository<>(
+        repository,
+        new IntegerConverter(),
+        new AvroSchemaConverter(),
+        new IdentityConverter()
+    );
+    Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema());
+    ByteBuffer byteBuffer = ByteBuffer.allocate(4);
+    converter.putSubjectAndId(id, byteBuffer);
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    out.write(byteBuffer.array());
+    // encode data
+    DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(someAvroDatum.getSchema());
+    // write avro datum to bytes
+    writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
+
+    final ByteEntity entity = new ByteEntity(ByteBuffer.wrap(out.toByteArray()));
+
+    InputRow inputRow = inputFormat2.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next();
+
+    assertInputRowCorrect(inputRow, DIMENSIONS, false);
+  }
+
+  @Test
+  public void testParseSchemaless() throws SchemaValidationException, IOException
+  {
+    Repository repository = new InMemoryRepository(null);
+    AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
+        flattenSpec,
+        new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+        false
+    );
+    NestedInputFormat inputFormat2 = jsonMapper.readValue(
+        jsonMapper.writeValueAsString(inputFormat),
+        NestedInputFormat.class
+    );
+    repository = ((SchemaRepoBasedAvroBytesDecoder) ((AvroStreamInputFormat) inputFormat2).getAvroBytesDecoder()).getSchemaRepository();
+
+    // prepare data
+    GenericRecord someAvroDatum = buildSomeAvroDatum();
+
+    // encode schema id
+    Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC);
+    TypedSchemaRepository<Integer, Schema, String> repositoryClient = new TypedSchemaRepository<>(
+        repository,
+        new IntegerConverter(),
+        new AvroSchemaConverter(),
+        new IdentityConverter()
+    );
+    Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema());
+    ByteBuffer byteBuffer = ByteBuffer.allocate(4);
+    converter.putSubjectAndId(id, byteBuffer);
+    try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+      out.write(byteBuffer.array());
+      // encode data
+      DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(someAvroDatum.getSchema());
+      // write avro datum to bytes
+      writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
+
+      final ByteEntity entity = new ByteEntity(ByteBuffer.wrap(out.toByteArray()));
+
+      InputRow inputRow = inputFormat2.createReader(new InputRowSchema(timestampSpec, new DimensionsSpec(null, null, null), null), entity, null).read().next();
+
+      assertInputRowCorrect(inputRow, DIMENSIONS_SCHEMALESS, false);
+    }
+  }
+}
diff --git a/integration-tests/src/test/resources/stream/data/avro/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/avro/input_format/input_format.json
new file mode 100644
index 0000000..b6b9c61
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/avro/input_format/input_format.json
@@ -0,0 +1,81 @@
+{
+  "type": "avro_stream",
+  "avroBytesDecoder": {
+    "type": "schema_inline",
+    "schema": {
+      "namespace": "org.apache.druid",
+      "name": "wikipedia",
+      "type": "record",
+      "fields": [
+        {
+          "name": "timestamp",
+          "type": "string"
+        },
+        {
+          "name": "page",
+          "type": "string"
+        },
+        {
+          "name": "language",
+          "type": "string"
+        },
+        {
+          "name": "user",
+          "type": "string"
+        },
+        {
+          "name": "unpatrolled",
+          "type": "string"
+        },
+        {
+          "name": "newPage",
+          "type": "string"
+        },
+        {
+          "name": "robot",
+          "type": "string"
+        },
+        {
+          "name": "anonymous",
+          "type": "string"
+        },
+        {
+          "name": "namespace",
+          "type": "string"
+        },
+        {
+          "name": "continent",
+          "type": "string"
+        },
+        {
+          "name": "country",
+          "type": "string"
+        },
+        {
+          "name": "region",
+          "type": "string"
+        },
+        {
+          "name": "city",
+          "type": "string"
+        },
+        {
+          "name": "added",
+          "type": "long"
+        },
+        {
+          "name": "deleted",
+          "type": "long"
+        },
+        {
+          "name": "delta",
+          "type": "long"
+        }
+      ]
+    }
+  },
+  "flattenSpec": {
+    "useFieldDiscovery": true
+  },
+  "binaryAsString": false
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/avro_schema_registry/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/avro_schema_registry/input_format/input_format.json
new file mode 100644
index 0000000..c9c8ce7
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/avro_schema_registry/input_format/input_format.json
@@ -0,0 +1,15 @@
+{
+  "type": "avro_stream",
+  "avroBytesDecoder": {
+    "type": "schema_registry",
+    "url": "%%SCHEMA_REGISTRY_HOST%%",
+    "config": {
+      "basic.auth.credentials.source": "USER_INFO",
+      "basic.auth.user.info": "druid:diurd"
+    }
+  },
+  "flattenSpec": {
+    "useFieldDiscovery": true
+  },
+  "binaryAsString": false
+}
\ No newline at end of file
diff --git a/web-console/src/druid-models/input-format.tsx b/web-console/src/druid-models/input-format.tsx
index 7907498..4bd9702 100644
--- a/web-console/src/druid-models/input-format.tsx
+++ b/web-console/src/druid-models/input-format.tsx
@@ -42,7 +42,7 @@ export const INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
     name: 'type',
     label: 'Input format',
     type: 'string',
-    suggestions: ['json', 'csv', 'tsv', 'regex', 'parquet', 'orc', 'avro_ocf'],
+    suggestions: ['json', 'csv', 'tsv', 'regex', 'parquet', 'orc', 'avro_ocf', 'avro_stream'],
     required: true,
     info: (
       <>
@@ -127,7 +127,7 @@ export const INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
     name: 'binaryAsString',
     type: 'boolean',
     defaultValue: false,
-    defined: (p: InputFormat) => oneOf(p.type, 'parquet', 'orc', 'avro_ocf'),
+    defined: (p: InputFormat) => oneOf(p.type, 'parquet', 'orc', 'avro_ocf', 'avro_stream'),
     info: (
       <>
         Specifies if the binary column which is not logically marked as a string should be treated
@@ -142,5 +142,5 @@ export function issueWithInputFormat(inputFormat: InputFormat | undefined): stri
 }
 
 export function inputFormatCanFlatten(inputFormat: InputFormat): boolean {
-  return oneOf(inputFormat.type, 'json', 'parquet', 'orc', 'avro_ocf');
+  return oneOf(inputFormat.type, 'json', 'parquet', 'orc', 'avro_ocf', 'avro_stream');
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org