You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2021/03/08 16:12:46 UTC

[druid] branch master updated: add avro + kafka + schema registry integration test (#10929)

This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 96889cd  add avro + kafka + schema registry integration test (#10929)
96889cd is described below

commit 96889cdebce68cee839b350205b60823227e142b
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon Mar 8 08:12:12 2021 -0800

    add avro + kafka + schema registry integration test (#10929)
    
    * add avro + schema registry integration test
    
    * style
    
    * retry init
    
    * maybe this
    
    * oops heh
    
    * this will fix it
    
    * review stuffs
    
    * fix comment
---
 .../avro/SchemaRegistryBasedAvroBytesDecoder.java  |  31 ++++--
 .../SchemaRegistryBasedAvroBytesDecoderTest.java   |  46 ++++++---
 integration-tests/docker/docker-compose.base.yml   | 108 +++++++++++++--------
 .../docker-compose.schema-registry-indexer.yml     |  29 ++++++
 .../docker/docker-compose.schema-registry.yml      |  29 ++++++
 .../docker/schema-registry/jaas_config.file        |   5 +
 .../docker/schema-registry/password-file           |   1 +
 integration-tests/pom.xml                          |  42 ++++++++
 integration-tests/script/docker_compose_args.sh    |  15 ++-
 .../druid/testing/ConfigFileConfigProvider.java    |   8 ++
 .../apache/druid/testing/DockerConfigProvider.java |  19 ++++
 .../druid/testing/IntegrationTestingConfig.java    |   7 ++
 .../druid/testing/utils/AvroEventSerializer.java   |  16 ++-
 .../utils/AvroSchemaRegistryEventSerializer.java   | 106 ++++++++++++++++++++
 .../druid/testing/utils/EventSerializer.java       |   8 +-
 .../testing/utils/SyntheticStreamGenerator.java    |   1 +
 .../indexer/AbstractKafkaIndexingServiceTest.java  |   8 ++
 .../parser/input_row_parser.json                   |  21 ++++
 .../serializer/serializer.json                     |   3 +
 pom.xml                                            |   1 +
 20 files changed, 434 insertions(+), 70 deletions(-)

diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
index 42765fa..123f8fa 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
@@ -26,14 +26,17 @@ import io.confluent.kafka.schemaregistry.ParsedSchema;
 import io.confluent.kafka.schemaregistry.avro.AvroSchema;
 import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.parsers.ParseException;
 
 import javax.annotation.Nullable;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
@@ -70,18 +73,32 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
   @Override
   public GenericRecord parse(ByteBuffer bytes)
   {
+    int length = bytes.limit() - 1 - 4;
+    if (length < 0) {
+      throw new ParseException("Failed to decode avro message, not enough bytes to decode (%s)", bytes.limit());
+    }
+
+    bytes.get(); // ignore first \0 byte
+    int id = bytes.getInt(); // extract schema registry id
+    int offset = bytes.position() + bytes.arrayOffset();
+    Schema schema;
+
     try {
-      bytes.get(); // ignore first \0 byte
-      int id = bytes.getInt(); // extract schema registry id
-      int length = bytes.limit() - 1 - 4;
-      int offset = bytes.position() + bytes.arrayOffset();
       ParsedSchema parsedSchema = registry.getSchemaById(id);
-      Schema schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null;
-      DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
+      schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null;
+    }
+    catch (IOException | RestClientException ex) {
+      throw new RE(ex, "Failed to get Avro schema: %s", id);
+    }
+    if (schema == null) {
+      throw new RE("Failed to find Avro schema: %s", id);
+    }
+    DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
+    try {
       return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null));
     }
     catch (Exception e) {
-      throw new ParseException(e, "Fail to decode avro message!");
+      throw new ParseException(e, "Fail to decode Avro message for schema: %s!", id);
     }
   }
 
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
index 55c7e6b..3eb6439 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.data.input.avro;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import io.confluent.kafka.schemaregistry.ParsedSchema;
 import io.confluent.kafka.schemaregistry.avro.AvroSchema;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
 import org.apache.avro.Schema;
@@ -29,6 +30,7 @@ import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.druid.data.input.AvroStreamInputRowParserTest;
 import org.apache.druid.data.input.SomeAvroDatum;
+import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.parsers.ParseException;
 import org.junit.Assert;
 import org.junit.Before;
@@ -96,40 +98,60 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
   public void testParse() throws Exception
   {
     // Given
-    Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema()));
+    Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234)))
+           .thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema()));
     GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
     Schema schema = SomeAvroDatum.getClassSchema();
     byte[] bytes = getAvroDatum(schema, someAvroDatum);
     ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes);
     bb.rewind();
     // When
-    GenericRecord actual = new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
-    // Then
-    Assert.assertEquals(someAvroDatum.get("id"), actual.get("id"));
+    new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
+  }
+
+  @Test(expected = ParseException.class)
+  public void testParseCorruptedNotEnoughBytesToEvenGetSchemaInfo()
+  {
+    // Given
+    ByteBuffer bb = ByteBuffer.allocate(2).put((byte) 0).put(1, (byte) 1);
+    bb.rewind();
+    // When
+    new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
   }
 
   @Test(expected = ParseException.class)
-  public void testParseCorrupted() throws Exception
+  public void testParseCorruptedPartial() throws Exception
   {
     // Given
-    Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema()));
+    Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234)))
+           .thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema()));
     GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
     Schema schema = SomeAvroDatum.getClassSchema();
     byte[] bytes = getAvroDatum(schema, someAvroDatum);
-    ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put((bytes), 5, 10);
+    ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4);
+    bb.rewind();
     // When
     new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
   }
 
-  @Test(expected = ParseException.class)
+  @Test(expected = RE.class)
+  public void testParseWrongSchemaType() throws Exception
+  {
+    // Given
+    Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(Mockito.mock(ParsedSchema.class));
+    ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234);
+    bb.rewind();
+    // When
+    new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
+  }
+
+  @Test(expected = RE.class)
   public void testParseWrongId() throws Exception
   {
     // Given
     Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran"));
-    GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
-    Schema schema = SomeAvroDatum.getClassSchema();
-    byte[] bytes = getAvroDatum(schema, someAvroDatum);
-    ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes);
+    ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234);
+    bb.rewind();
     // When
     new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
   }
diff --git a/integration-tests/docker/docker-compose.base.yml b/integration-tests/docker/docker-compose.base.yml
index 66b8526..46dda1d 100644
--- a/integration-tests/docker/docker-compose.base.yml
+++ b/integration-tests/docker/docker-compose.base.yml
@@ -34,7 +34,7 @@ networks:
         - subnet: 172.172.172.0/24
 
 services:
-### supporting infra:
+### always there supporting infra:
   druid-zookeeper-kafka:
     image: druid/cluster
     container_name: druid-zookeeper-kafka
@@ -71,45 +71,6 @@ services:
     env_file:
       - ./environment-configs/common
 
-  druid-it-hadoop:
-    image: druid-it/hadoop:2.8.5
-    container_name: druid-it-hadoop
-    ports:
-      - 2049:2049
-      - 2122:2122
-      - 8020:8020
-      - 8021:8021
-      - 8030:8030
-      - 8031:8031
-      - 8032:8032
-      - 8033:8033
-      - 8040:8040
-      - 8042:8042
-      - 8088:8088
-      - 8443:8443
-      - 9000:9000
-      - 10020:10020
-      - 19888:19888
-      - 34455:34455
-      - 50010:50010
-      - 50020:50020
-      - 50030:50030
-      - 50060:50060
-      - 50070:50070
-      - 50075:50075
-      - 50090:50090
-      - 51111:51111
-    networks:
-      druid-it-net:
-        ipv4_address: 172.172.172.101
-    privileged: true
-    volumes:
-      - ${HOME}/shared:/shared
-      - ./../src/test/resources:/resources
-    hostname: "druid-it-hadoop"
-    command: "bash -c 'echo Start druid-it-hadoop container... && \
-                /etc/bootstrap.sh && \
-                tail -f /dev/null'"
 
 ### overlords
   druid-overlord:
@@ -357,12 +318,54 @@ services:
       - ./environment-configs/common
       - ./environment-configs/router-custom-check-tls
 
+### optional supporting infra
+  druid-it-hadoop:
+    image: druid-it/hadoop:2.8.5
+    container_name: druid-it-hadoop
+    ports:
+      - 2049:2049
+      - 2122:2122
+      - 8020:8020
+      - 8021:8021
+      - 8030:8030
+      - 8031:8031
+      - 8032:8032
+      - 8033:8033
+      - 8040:8040
+      - 8042:8042
+      - 8088:8088
+      - 8443:8443
+      - 9000:9000
+      - 10020:10020
+      - 19888:19888
+      - 34455:34455
+      - 50010:50010
+      - 50020:50020
+      - 50030:50030
+      - 50060:50060
+      - 50070:50070
+      - 50075:50075
+      - 50090:50090
+      - 51111:51111
+    networks:
+      druid-it-net:
+        ipv4_address: 172.172.172.101
+    privileged: true
+    volumes:
+      - ${HOME}/shared:/shared
+      - ./../src/test/resources:/resources
+    hostname: "druid-it-hadoop"
+    command: "bash -c 'echo Start druid-it-hadoop container... && \
+                /etc/bootstrap.sh && \
+                tail -f /dev/null'"
+
+
   druid-openldap:
     image: osixia/openldap:1.4.0
     container_name: druid-openldap
     networks:
       druid-it-net:
-        ipv4_address: 172.172.172.74
+        ipv4_address: 172.172.172.102
     ports:
       - 8389:389
       - 8636:636
@@ -373,3 +376,26 @@ services:
     env_file:
       - ./environment-configs/common
     command: --copy-service
+
+
+  schema-registry:
+    image: confluentinc/cp-schema-registry:5.5.1
+    container_name: schema-registry
+    ports:
+      - 8085:8085
+    networks:
+      druid-it-net:
+        ipv4_address: 172.172.172.103
+    volumes:
+      - ${HOME}/shared:/shared
+      - ./schema-registry/jaas_config.file:/usr/lib/druid/conf/jaas_config.file
+      - ./schema-registry/password-file:/usr/lib/druid/conf/password-file
+    privileged: true
+    environment:
+      SCHEMA_REGISTRY_HOST_NAME: schema-registry
+      SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085"
+      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: druid-zookeeper-kafka:9092
+      SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
+      SCHEMA_REGISTRY_AUTHENTICATION_REALM: druid
+      SCHEMA_REGISTRY_AUTHENTICATION_ROLES: users
+      SCHEMA_REGISTRY_OPTS: -Djava.security.auth.login.config=/usr/lib/druid/conf/jaas_config.file
diff --git a/integration-tests/docker/docker-compose.schema-registry-indexer.yml b/integration-tests/docker/docker-compose.schema-registry-indexer.yml
new file mode 100644
index 0000000..71c3814
--- /dev/null
+++ b/integration-tests/docker/docker-compose.schema-registry-indexer.yml
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "2.2"
+services:
+  schema-registry:
+    extends:
+      file: docker-compose.base.yml
+      service: schema-registry
+    depends_on:
+      - druid-zookeeper-kafka
+    links:
+      - druid-zookeeper-kafka:druid-zookeeper-kafka
+      - druid-coordinator:druid-coordinator
+      - druid-broker:druid-broker
+      - druid-historical:druid-historical
+      - druid-indexer:druid-indexer
diff --git a/integration-tests/docker/docker-compose.schema-registry.yml b/integration-tests/docker/docker-compose.schema-registry.yml
new file mode 100644
index 0000000..5611e2a
--- /dev/null
+++ b/integration-tests/docker/docker-compose.schema-registry.yml
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "2.2"
+services:
+  schema-registry:
+    extends:
+      file: docker-compose.base.yml
+      service: schema-registry
+    depends_on:
+      - druid-zookeeper-kafka
+    links:
+      - druid-zookeeper-kafka:druid-zookeeper-kafka
+      - druid-coordinator:druid-coordinator
+      - druid-broker:druid-broker
+      - druid-middlemanager:druid-middlemanager
+      - druid-historical:druid-historical
diff --git a/integration-tests/docker/schema-registry/jaas_config.file b/integration-tests/docker/schema-registry/jaas_config.file
new file mode 100644
index 0000000..dc48bed
--- /dev/null
+++ b/integration-tests/docker/schema-registry/jaas_config.file
@@ -0,0 +1,5 @@
+druid {
+  org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required
+  file="/usr/lib/druid/conf/password-file"
+  debug="true";
+};
\ No newline at end of file
diff --git a/integration-tests/docker/schema-registry/password-file b/integration-tests/docker/schema-registry/password-file
new file mode 100644
index 0000000..c850844
--- /dev/null
+++ b/integration-tests/docker/schema-registry/password-file
@@ -0,0 +1 @@
+druid: diurd,users
\ No newline at end of file
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index fc3b7b5..2fb9adc 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -31,6 +31,13 @@
         <version>0.22.0-SNAPSHOT</version>
     </parent>
 
+    <repositories>
+        <repository>
+            <id>confluent</id>
+            <url>https://packages.confluent.io/maven/</url>
+        </repository>
+    </repositories>
+
     <dependencies>
         <dependency>
             <groupId>com.amazonaws</groupId>
@@ -320,6 +327,41 @@
             <artifactId>guice-servlet</artifactId>
             <version>${guice.version}</version>
         </dependency>
+        <dependency>
+            <groupId>io.confluent</groupId>
+            <artifactId>kafka-schema-registry-client</artifactId>
+            <version>5.5.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.ws.rs</groupId>
+                    <artifactId>javax.ws.rs-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.ws.rs</groupId>
+                    <artifactId>javax.ws.rs-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.ws.rs</groupId>
+                    <artifactId>jsr311-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>jakarta.ws.rs</groupId>
+                    <artifactId>jakarta.ws.rs-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
 
         <!-- Tests -->
         <dependency>
diff --git a/integration-tests/script/docker_compose_args.sh b/integration-tests/script/docker_compose_args.sh
index e43f88d..ea61e88 100644
--- a/integration-tests/script/docker_compose_args.sh
+++ b/integration-tests/script/docker_compose_args.sh
@@ -28,7 +28,6 @@ getComposeArgs()
       echo "DRUID_INTEGRATION_TEST_INDEXER must be 'indexer' or 'middleManager' (is '$DRUID_INTEGRATION_TEST_INDEXER')"
       exit 1
     fi
-
     if [ "$DRUID_INTEGRATION_TEST_INDEXER" = "indexer" ]
     then
       # Sanity check: cannot combine CliIndexer tests with security, query-retry tests
@@ -36,10 +35,14 @@ getComposeArgs()
       then
         echo "Cannot run test group '$DRUID_INTEGRATION_TEST_GROUP' with CliIndexer"
         exit 1
+      elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "kafka-data-format" ]
+      then
+        # Replace MiddleManager with Indexer + schema registry container
+        echo "-f ${DOCKERDIR}/docker-compose.cli-indexer.yml -f ${DOCKERDIR}/docker-compose.schema-registry-indexer.yml"
+      else
+        # Replace MiddleManager with Indexer
+        echo "-f ${DOCKERDIR}/docker-compose.cli-indexer.yml"
       fi
-
-      # Replace MiddleManager with Indexer
-      echo "-f ${DOCKERDIR}/docker-compose.cli-indexer.yml"
     elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ]
     then
       # default + additional druid router (custom-check-tls, permissive-tls, no-client-auth-tls)
@@ -57,6 +60,10 @@ getComposeArgs()
     then
       # the 'high availability' test cluster with multiple coordinators and overlords
       echo "-f ${DOCKERDIR}/docker-compose.high-availability.yml"
+    elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "kafka-data-format" ]
+    then
+      # default + schema registry container
+      echo "-f ${DOCKERDIR}/docker-compose.yml -f ${DOCKERDIR}/docker-compose.schema-registry.yml"
     else
       # default
       echo "-f ${DOCKERDIR}/docker-compose.yml"
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java
index 162eda9..7cd0387 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java
@@ -63,6 +63,7 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
   private String middleManagerHost;
   private String zookeeperHosts;        // comma-separated list of host:port
   private String kafkaHost;
+  private String schemaRegistryHost;
   private Map<String, String> props = null;
   private String username;
   private String password;
@@ -222,6 +223,7 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
 
     zookeeperHosts = props.get("zookeeper_hosts");
     kafkaHost = props.get("kafka_host") + ":" + props.get("kafka_port");
+    schemaRegistryHost = props.get("schema_registry_host") + ":" + props.get("schema_registry_port");
 
     username = props.get("username");
 
@@ -500,6 +502,12 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
       }
 
       @Override
+      public String getSchemaRegistryHost()
+      {
+        return schemaRegistryHost;
+      }
+
+      @Override
       public Map<String, String> getProperties()
       {
         return props;
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
index 8924cfc..bb742a9 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
@@ -32,6 +32,13 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+/**
+ * The values here should be kept in sync with the values used in the docker-compose files used to bring up the
+ * integration-test clusters.
+ *
+ * integration-tests/docker/docker-compose.base.yml defines most of the hostnames, ports, and addresses, but some
+ * might live in the overrides as well.
+ */
 public class DockerConfigProvider implements IntegrationTestingConfigProvider
 {
   @JsonProperty
@@ -318,6 +325,18 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
       }
 
       @Override
+      public String getSchemaRegistryHost()
+      {
+        return dockerIp + ":8085";
+      }
+
+      @Override
+      public String getSchemaRegistryInternalHost()
+      {
+        return "schema-registry:8085";
+      }
+
+      @Override
       public String getProperty(String prop)
       {
         return properties.get(prop);
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java
index 3c1951c..b65507e 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java
@@ -164,6 +164,13 @@ public interface IntegrationTestingConfig
 
   String getStreamEndpoint();
 
+  String getSchemaRegistryHost();
+
+  default String getSchemaRegistryInternalHost()
+  {
+    return getSchemaRegistryHost();
+  }
+
   boolean isDocker();
 
   @Nullable
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java
index 284fd09..7457f84 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java
@@ -40,7 +40,7 @@ public class AvroEventSerializer implements EventSerializer
 {
   public static final String TYPE = "avro";
 
-  private static final Schema SCHEMA = SchemaBuilder
+  static final Schema SCHEMA = SchemaBuilder
       .record("wikipedia")
       .namespace("org.apache.druid")
       .fields()
@@ -62,12 +62,12 @@ public class AvroEventSerializer implements EventSerializer
       .requiredInt("delta")
       .endRecord();
 
-  private final DatumWriter<Object> writer = new GenericDatumWriter<>(SCHEMA);
+  protected final DatumWriter<Object> writer = new GenericDatumWriter<>(SCHEMA);
 
   @Override
   public byte[] serialize(List<Pair<String, Object>> event) throws IOException
   {
-    final WikipediaRecord record = new WikipediaRecord();
+    final WikipediaRecord record = new WikipediaRecord(SCHEMA);
     event.forEach(pair -> record.put(pair.lhs, pair.rhs));
     final ByteArrayOutputStream out = new ByteArrayOutputStream();
     final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
@@ -82,12 +82,18 @@ public class AvroEventSerializer implements EventSerializer
   {
   }
 
-  private static class WikipediaRecord implements GenericRecord
+  static class WikipediaRecord implements GenericRecord
   {
     private final Map<String, Object> event = new HashMap<>();
     private final BiMap<Integer, String> indexes = HashBiMap.create(SCHEMA.getFields().size());
 
     private int nextIndex = 0;
+    private final Schema schema;
+
+    public WikipediaRecord(Schema schema)
+    {
+      this.schema = schema;
+    }
 
     @Override
     public void put(String key, Object v)
@@ -125,7 +131,7 @@ public class AvroEventSerializer implements EventSerializer
     @Override
     public Schema getSchema()
     {
-      return SCHEMA;
+      return schema;
     }
   }
 }
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroSchemaRegistryEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroSchemaRegistryEventSerializer.java
new file mode 100644
index 0000000..dd1e82f
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroSchemaRegistryEventSerializer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.google.common.collect.ImmutableMap;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testing.IntegrationTestingConfig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class AvroSchemaRegistryEventSerializer extends AvroEventSerializer
+{
+  private static final int MAX_INITIALIZE_RETRIES = 10;
+  public static final String TYPE = "avro-schema-registry";
+
+  private final IntegrationTestingConfig config;
+  private final CachedSchemaRegistryClient client;
+  private int schemaId = -1;
+
+  private Schema fromRegistry;
+
+  @JsonCreator
+  public AvroSchemaRegistryEventSerializer(
+      @JacksonInject IntegrationTestingConfig config
+  )
+  {
+    this.config = config;
+    this.client = new CachedSchemaRegistryClient(
+        StringUtils.format("http://%s", config.getSchemaRegistryHost()),
+        Integer.MAX_VALUE,
+        ImmutableMap.of(
+            "basic.auth.credentials.source", "USER_INFO",
+            "basic.auth.user.info", "druid:diurd"
+        ),
+        ImmutableMap.of()
+    );
+
+  }
+
+  @Override
+  public void initialize(String topic)
+  {
+    try {
+      RetryUtils.retry(
+          () -> {
+            schemaId = client.register(topic, AvroEventSerializer.SCHEMA);
+            fromRegistry = client.getById(schemaId);
+            return 0;
+          },
+          (e) -> true,
+          MAX_INITIALIZE_RETRIES
+      );
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public byte[] serialize(List<Pair<String, Object>> event) throws IOException
+  {
+    final WikipediaRecord record = new WikipediaRecord(fromRegistry);
+    event.forEach(pair -> record.put(pair.lhs, pair.rhs));
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    out.write(0x0);
+    out.write(ByteBuffer.allocate(4).putInt(schemaId).array());
+    BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
+    DatumWriter<Object> writer = new GenericDatumWriter<>(fromRegistry);
+    writer.write(record, encoder);
+    encoder.flush();
+    byte[] bytes = out.toByteArray();
+    out.close();
+    return bytes;
+  }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java
index 014d8c8..cad5acf 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java
@@ -42,9 +42,15 @@ import java.util.List;
     @Type(name = JsonEventSerializer.TYPE, value = JsonEventSerializer.class),
     @Type(name = CsvEventSerializer.TYPE, value = CsvEventSerializer.class),
     @Type(name = DelimitedEventSerializer.TYPE, value = DelimitedEventSerializer.class),
-    @Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class)
+    @Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class),
+    @Type(name = AvroSchemaRegistryEventSerializer.TYPE, value = AvroSchemaRegistryEventSerializer.class)
 })
 public interface EventSerializer extends Closeable
 {
+  default void initialize(String topic)
+  {
+
+  }
+
   byte[] serialize(List<Pair<String, Object>> event) throws IOException;
 }
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
index cf69ccd..bd9c1b8 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
@@ -61,6 +61,7 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
       DateTime overrrideFirstEventTime
   )
   {
+    serializer.initialize(streamTopic);
     // The idea here is that we will send [eventsPerSecond] events that will either use [nowFlooredToSecond]
     // or the [overrrideFirstEventTime] as the primary timestamp.
     // Having a fixed number of events that use the same timestamp will help in allowing us to determine if any events
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
index 204b6ef..5ea11e6 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
@@ -83,6 +83,7 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd
             "%%TOPIC_VALUE%%",
             streamName
         );
+
         if (AbstractStreamIndexingTest.INPUT_FORMAT.equals(parserType)) {
           spec = StringUtils.replace(
               spec,
@@ -116,6 +117,13 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd
             "%%STREAM_PROPERTIES_KEY%%",
             "consumerProperties"
         );
+
+        spec = StringUtils.replace(
+            spec,
+            "%%SCHEMA_REGISTRY_HOST%%",
+            StringUtils.format("http://%s", config.getSchemaRegistryInternalHost())
+        );
+
         return StringUtils.replace(
             spec,
             "%%STREAM_PROPERTIES_VALUE%%",
diff --git a/integration-tests/src/test/resources/stream/data/avro_schema_registry/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/avro_schema_registry/parser/input_row_parser.json
new file mode 100644
index 0000000..c48871a
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/avro_schema_registry/parser/input_row_parser.json
@@ -0,0 +1,21 @@
+{
+  "type": "avro_stream",
+  "avroBytesDecoder" : {
+    "type": "schema_registry",
+    "url": "%%SCHEMA_REGISTRY_HOST%%",
+    "config": {
+      "basic.auth.credentials.source": "USER_INFO",
+      "basic.auth.user.info": "druid:diurd"
+    }
+  },
+  "parseSpec": {
+    "format": "avro",
+    "timestampSpec": {
+      "column": "timestamp",
+      "format": "auto"
+    },
+    "dimensionsSpec": {
+      "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"]
+    }
+  }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/avro_schema_registry/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/avro_schema_registry/serializer/serializer.json
new file mode 100644
index 0000000..5251ade
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/avro_schema_registry/serializer/serializer.json
@@ -0,0 +1,3 @@
+{
+  "type": "avro-schema-registry"
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 157aec2..10ddf02 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1949,6 +1949,7 @@
                                 <exclude>**/*.json</exclude>
                                 <exclude>**/*.parq</exclude>
                                 <exclude>**/*.parquet</exclude>
+                                <exclude>**/docker/schema-registry/*</exclude>
                                 <exclude>LICENSE</exclude>
                                 <exclude>LICENSE.BINARY</exclude>
                                 <exclude>NOTICE</exclude>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org