You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2021/03/08 16:12:46 UTC
[druid] branch master updated: add avro + kafka + schema registry
integration test (#10929)
This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 96889cd add avro + kafka + schema registry integration test (#10929)
96889cd is described below
commit 96889cdebce68cee839b350205b60823227e142b
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon Mar 8 08:12:12 2021 -0800
add avro + kafka + schema registry integration test (#10929)
* add avro + schema registry integration test
* style
* retry init
* maybe this
* oops heh
* this will fix it
* review stuffs
* fix comment
---
.../avro/SchemaRegistryBasedAvroBytesDecoder.java | 31 ++++--
.../SchemaRegistryBasedAvroBytesDecoderTest.java | 46 ++++++---
integration-tests/docker/docker-compose.base.yml | 108 +++++++++++++--------
.../docker-compose.schema-registry-indexer.yml | 29 ++++++
.../docker/docker-compose.schema-registry.yml | 29 ++++++
.../docker/schema-registry/jaas_config.file | 5 +
.../docker/schema-registry/password-file | 1 +
integration-tests/pom.xml | 42 ++++++++
integration-tests/script/docker_compose_args.sh | 15 ++-
.../druid/testing/ConfigFileConfigProvider.java | 8 ++
.../apache/druid/testing/DockerConfigProvider.java | 19 ++++
.../druid/testing/IntegrationTestingConfig.java | 7 ++
.../druid/testing/utils/AvroEventSerializer.java | 16 ++-
.../utils/AvroSchemaRegistryEventSerializer.java | 106 ++++++++++++++++++++
.../druid/testing/utils/EventSerializer.java | 8 +-
.../testing/utils/SyntheticStreamGenerator.java | 1 +
.../indexer/AbstractKafkaIndexingServiceTest.java | 8 ++
.../parser/input_row_parser.json | 21 ++++
.../serializer/serializer.json | 3 +
pom.xml | 1 +
20 files changed, 434 insertions(+), 70 deletions(-)
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
index 42765fa..123f8fa 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
@@ -26,14 +26,17 @@ import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
+import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.parsers.ParseException;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
@@ -70,18 +73,32 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
@Override
public GenericRecord parse(ByteBuffer bytes)
{
+ int length = bytes.limit() - 1 - 4;
+ if (length < 0) {
+ throw new ParseException("Failed to decode avro message, not enough bytes to decode (%s)", bytes.limit());
+ }
+
+ bytes.get(); // ignore first \0 byte
+ int id = bytes.getInt(); // extract schema registry id
+ int offset = bytes.position() + bytes.arrayOffset();
+ Schema schema;
+
try {
- bytes.get(); // ignore first \0 byte
- int id = bytes.getInt(); // extract schema registry id
- int length = bytes.limit() - 1 - 4;
- int offset = bytes.position() + bytes.arrayOffset();
ParsedSchema parsedSchema = registry.getSchemaById(id);
- Schema schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null;
- DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
+ schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null;
+ }
+ catch (IOException | RestClientException ex) {
+ throw new RE(ex, "Failed to get Avro schema: %s", id);
+ }
+ if (schema == null) {
+ throw new RE("Failed to find Avro schema: %s", id);
+ }
+ DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
+ try {
return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null));
}
catch (Exception e) {
- throw new ParseException(e, "Fail to decode avro message!");
+ throw new ParseException(e, "Fail to decode Avro message for schema: %s!", id);
}
}
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
index 55c7e6b..3eb6439 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.data.input.avro;
import com.fasterxml.jackson.databind.ObjectMapper;
+import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.apache.avro.Schema;
@@ -29,6 +30,7 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
import org.apache.druid.data.input.SomeAvroDatum;
+import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.junit.Assert;
import org.junit.Before;
@@ -96,40 +98,60 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
public void testParse() throws Exception
{
// Given
- Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema()));
+ Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234)))
+ .thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema()));
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes);
bb.rewind();
// When
- GenericRecord actual = new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
- // Then
- Assert.assertEquals(someAvroDatum.get("id"), actual.get("id"));
+ new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
+ }
+
+ @Test(expected = ParseException.class)
+ public void testParseCorruptedNotEnoughBytesToEvenGetSchemaInfo()
+ {
+ // Given
+ ByteBuffer bb = ByteBuffer.allocate(2).put((byte) 0).put(1, (byte) 1);
+ bb.rewind();
+ // When
+ new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
}
@Test(expected = ParseException.class)
- public void testParseCorrupted() throws Exception
+ public void testParseCorruptedPartial() throws Exception
{
// Given
- Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema()));
+ Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234)))
+ .thenReturn(new AvroSchema(SomeAvroDatum.getClassSchema()));
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
- ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put((bytes), 5, 10);
+ ByteBuffer bb = ByteBuffer.allocate(4 + 5).put((byte) 0).putInt(1234).put(bytes, 5, 4);
+ bb.rewind();
// When
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
}
- @Test(expected = ParseException.class)
+ @Test(expected = RE.class)
+ public void testParseWrongSchemaType() throws Exception
+ {
+ // Given
+ Mockito.when(registry.getSchemaById(ArgumentMatchers.eq(1234))).thenReturn(Mockito.mock(ParsedSchema.class));
+ ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234);
+ bb.rewind();
+ // When
+ new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
+ }
+
+ @Test(expected = RE.class)
public void testParseWrongId() throws Exception
{
// Given
Mockito.when(registry.getSchemaById(ArgumentMatchers.anyInt())).thenThrow(new IOException("no pasaran"));
- GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
- Schema schema = SomeAvroDatum.getClassSchema();
- byte[] bytes = getAvroDatum(schema, someAvroDatum);
- ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes);
+ ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234);
+ bb.rewind();
// When
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
}
diff --git a/integration-tests/docker/docker-compose.base.yml b/integration-tests/docker/docker-compose.base.yml
index 66b8526..46dda1d 100644
--- a/integration-tests/docker/docker-compose.base.yml
+++ b/integration-tests/docker/docker-compose.base.yml
@@ -34,7 +34,7 @@ networks:
- subnet: 172.172.172.0/24
services:
-### supporting infra:
+### always there supporting infra:
druid-zookeeper-kafka:
image: druid/cluster
container_name: druid-zookeeper-kafka
@@ -71,45 +71,6 @@ services:
env_file:
- ./environment-configs/common
- druid-it-hadoop:
- image: druid-it/hadoop:2.8.5
- container_name: druid-it-hadoop
- ports:
- - 2049:2049
- - 2122:2122
- - 8020:8020
- - 8021:8021
- - 8030:8030
- - 8031:8031
- - 8032:8032
- - 8033:8033
- - 8040:8040
- - 8042:8042
- - 8088:8088
- - 8443:8443
- - 9000:9000
- - 10020:10020
- - 19888:19888
- - 34455:34455
- - 50010:50010
- - 50020:50020
- - 50030:50030
- - 50060:50060
- - 50070:50070
- - 50075:50075
- - 50090:50090
- - 51111:51111
- networks:
- druid-it-net:
- ipv4_address: 172.172.172.101
- privileged: true
- volumes:
- - ${HOME}/shared:/shared
- - ./../src/test/resources:/resources
- hostname: "druid-it-hadoop"
- command: "bash -c 'echo Start druid-it-hadoop container... && \
- /etc/bootstrap.sh && \
- tail -f /dev/null'"
### overlords
druid-overlord:
@@ -357,12 +318,54 @@ services:
- ./environment-configs/common
- ./environment-configs/router-custom-check-tls
+### optional supporting infra
+ druid-it-hadoop:
+ image: druid-it/hadoop:2.8.5
+ container_name: druid-it-hadoop
+ ports:
+ - 2049:2049
+ - 2122:2122
+ - 8020:8020
+ - 8021:8021
+ - 8030:8030
+ - 8031:8031
+ - 8032:8032
+ - 8033:8033
+ - 8040:8040
+ - 8042:8042
+ - 8088:8088
+ - 8443:8443
+ - 9000:9000
+ - 10020:10020
+ - 19888:19888
+ - 34455:34455
+ - 50010:50010
+ - 50020:50020
+ - 50030:50030
+ - 50060:50060
+ - 50070:50070
+ - 50075:50075
+ - 50090:50090
+ - 51111:51111
+ networks:
+ druid-it-net:
+ ipv4_address: 172.172.172.101
+ privileged: true
+ volumes:
+ - ${HOME}/shared:/shared
+ - ./../src/test/resources:/resources
+ hostname: "druid-it-hadoop"
+ command: "bash -c 'echo Start druid-it-hadoop container... && \
+ /etc/bootstrap.sh && \
+ tail -f /dev/null'"
+
+
druid-openldap:
image: osixia/openldap:1.4.0
container_name: druid-openldap
networks:
druid-it-net:
- ipv4_address: 172.172.172.74
+ ipv4_address: 172.172.172.102
ports:
- 8389:389
- 8636:636
@@ -373,3 +376,26 @@ services:
env_file:
- ./environment-configs/common
command: --copy-service
+
+
+ schema-registry:
+ image: confluentinc/cp-schema-registry:5.5.1
+ container_name: schema-registry
+ ports:
+ - 8085:8085
+ networks:
+ druid-it-net:
+ ipv4_address: 172.172.172.103
+ volumes:
+ - ${HOME}/shared:/shared
+ - ./schema-registry/jaas_config.file:/usr/lib/druid/conf/jaas_config.file
+ - ./schema-registry/password-file:/usr/lib/druid/conf/password-file
+ privileged: true
+ environment:
+ SCHEMA_REGISTRY_HOST_NAME: schema-registry
+ SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085"
+ SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: druid-zookeeper-kafka:9092
+ SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
+ SCHEMA_REGISTRY_AUTHENTICATION_REALM: druid
+ SCHEMA_REGISTRY_AUTHENTICATION_ROLES: users
+ SCHEMA_REGISTRY_OPTS: -Djava.security.auth.login.config=/usr/lib/druid/conf/jaas_config.file
diff --git a/integration-tests/docker/docker-compose.schema-registry-indexer.yml b/integration-tests/docker/docker-compose.schema-registry-indexer.yml
new file mode 100644
index 0000000..71c3814
--- /dev/null
+++ b/integration-tests/docker/docker-compose.schema-registry-indexer.yml
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "2.2"
+services:
+ schema-registry:
+ extends:
+ file: docker-compose.base.yml
+ service: schema-registry
+ depends_on:
+ - druid-zookeeper-kafka
+ links:
+ - druid-zookeeper-kafka:druid-zookeeper-kafka
+ - druid-coordinator:druid-coordinator
+ - druid-broker:druid-broker
+ - druid-historical:druid-historical
+ - druid-indexer:druid-indexer
diff --git a/integration-tests/docker/docker-compose.schema-registry.yml b/integration-tests/docker/docker-compose.schema-registry.yml
new file mode 100644
index 0000000..5611e2a
--- /dev/null
+++ b/integration-tests/docker/docker-compose.schema-registry.yml
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "2.2"
+services:
+ schema-registry:
+ extends:
+ file: docker-compose.base.yml
+ service: schema-registry
+ depends_on:
+ - druid-zookeeper-kafka
+ links:
+ - druid-zookeeper-kafka:druid-zookeeper-kafka
+ - druid-coordinator:druid-coordinator
+ - druid-broker:druid-broker
+ - druid-middlemanager:druid-middlemanager
+ - druid-historical:druid-historical
diff --git a/integration-tests/docker/schema-registry/jaas_config.file b/integration-tests/docker/schema-registry/jaas_config.file
new file mode 100644
index 0000000..dc48bed
--- /dev/null
+++ b/integration-tests/docker/schema-registry/jaas_config.file
@@ -0,0 +1,5 @@
+druid {
+ org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required
+ file="/usr/lib/druid/conf/password-file"
+ debug="true";
+};
\ No newline at end of file
diff --git a/integration-tests/docker/schema-registry/password-file b/integration-tests/docker/schema-registry/password-file
new file mode 100644
index 0000000..c850844
--- /dev/null
+++ b/integration-tests/docker/schema-registry/password-file
@@ -0,0 +1 @@
+druid: diurd,users
\ No newline at end of file
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index fc3b7b5..2fb9adc 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -31,6 +31,13 @@
<version>0.22.0-SNAPSHOT</version>
</parent>
+ <repositories>
+ <repository>
+ <id>confluent</id>
+ <url>https://packages.confluent.io/maven/</url>
+ </repository>
+ </repositories>
+
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
@@ -320,6 +327,41 @@
<artifactId>guice-servlet</artifactId>
<version>${guice.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-schema-registry-client</artifactId>
+ <version>5.5.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>javax.ws.rs-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>javax.ws.rs-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>jsr311-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>jakarta.ws.rs</groupId>
+ <artifactId>jakarta.ws.rs-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<!-- Tests -->
<dependency>
diff --git a/integration-tests/script/docker_compose_args.sh b/integration-tests/script/docker_compose_args.sh
index e43f88d..ea61e88 100644
--- a/integration-tests/script/docker_compose_args.sh
+++ b/integration-tests/script/docker_compose_args.sh
@@ -28,7 +28,6 @@ getComposeArgs()
echo "DRUID_INTEGRATION_TEST_INDEXER must be 'indexer' or 'middleManager' (is '$DRUID_INTEGRATION_TEST_INDEXER')"
exit 1
fi
-
if [ "$DRUID_INTEGRATION_TEST_INDEXER" = "indexer" ]
then
# Sanity check: cannot combine CliIndexer tests with security, query-retry tests
@@ -36,10 +35,14 @@ getComposeArgs()
then
echo "Cannot run test group '$DRUID_INTEGRATION_TEST_GROUP' with CliIndexer"
exit 1
+ elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "kafka-data-format" ]
+ then
+ # Replace MiddleManager with Indexer + schema registry container
+ echo "-f ${DOCKERDIR}/docker-compose.cli-indexer.yml -f ${DOCKERDIR}/docker-compose.schema-registry-indexer.yml"
+ else
+ # Replace MiddleManager with Indexer
+ echo "-f ${DOCKERDIR}/docker-compose.cli-indexer.yml"
fi
-
- # Replace MiddleManager with Indexer
- echo "-f ${DOCKERDIR}/docker-compose.cli-indexer.yml"
elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ]
then
# default + additional druid router (custom-check-tls, permissive-tls, no-client-auth-tls)
@@ -57,6 +60,10 @@ getComposeArgs()
then
# the 'high availability' test cluster with multiple coordinators and overlords
echo "-f ${DOCKERDIR}/docker-compose.high-availability.yml"
+ elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "kafka-data-format" ]
+ then
+ # default + schema registry container
+ echo "-f ${DOCKERDIR}/docker-compose.yml -f ${DOCKERDIR}/docker-compose.schema-registry.yml"
else
# default
echo "-f ${DOCKERDIR}/docker-compose.yml"
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java
index 162eda9..7cd0387 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java
@@ -63,6 +63,7 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
private String middleManagerHost;
private String zookeeperHosts; // comma-separated list of host:port
private String kafkaHost;
+ private String schemaRegistryHost;
private Map<String, String> props = null;
private String username;
private String password;
@@ -222,6 +223,7 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
zookeeperHosts = props.get("zookeeper_hosts");
kafkaHost = props.get("kafka_host") + ":" + props.get("kafka_port");
+ schemaRegistryHost = props.get("schema_registry_host") + ":" + props.get("schema_registry_port");
username = props.get("username");
@@ -500,6 +502,12 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
}
@Override
+ public String getSchemaRegistryHost()
+ {
+ return schemaRegistryHost;
+ }
+
+ @Override
public Map<String, String> getProperties()
{
return props;
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
index 8924cfc..bb742a9 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
@@ -32,6 +32,13 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+/**
+ * The values here should be kept in sync with the values used in the docker-compose files used to bring up the
+ * integration-test clusters.
+ *
+ * integration-tests/docker/docker-compose.base.yml defines most of the hostnames, ports, and addresses, but some
+ * might live in the overrides as well.
+ */
public class DockerConfigProvider implements IntegrationTestingConfigProvider
{
@JsonProperty
@@ -318,6 +325,18 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
}
@Override
+ public String getSchemaRegistryHost()
+ {
+ return dockerIp + ":8085";
+ }
+
+ @Override
+ public String getSchemaRegistryInternalHost()
+ {
+ return "schema-registry:8085";
+ }
+
+ @Override
public String getProperty(String prop)
{
return properties.get(prop);
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java
index 3c1951c..b65507e 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java
@@ -164,6 +164,13 @@ public interface IntegrationTestingConfig
String getStreamEndpoint();
+ String getSchemaRegistryHost();
+
+ default String getSchemaRegistryInternalHost()
+ {
+ return getSchemaRegistryHost();
+ }
+
boolean isDocker();
@Nullable
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java
index 284fd09..7457f84 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java
@@ -40,7 +40,7 @@ public class AvroEventSerializer implements EventSerializer
{
public static final String TYPE = "avro";
- private static final Schema SCHEMA = SchemaBuilder
+ static final Schema SCHEMA = SchemaBuilder
.record("wikipedia")
.namespace("org.apache.druid")
.fields()
@@ -62,12 +62,12 @@ public class AvroEventSerializer implements EventSerializer
.requiredInt("delta")
.endRecord();
- private final DatumWriter<Object> writer = new GenericDatumWriter<>(SCHEMA);
+ protected final DatumWriter<Object> writer = new GenericDatumWriter<>(SCHEMA);
@Override
public byte[] serialize(List<Pair<String, Object>> event) throws IOException
{
- final WikipediaRecord record = new WikipediaRecord();
+ final WikipediaRecord record = new WikipediaRecord(SCHEMA);
event.forEach(pair -> record.put(pair.lhs, pair.rhs));
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
@@ -82,12 +82,18 @@ public class AvroEventSerializer implements EventSerializer
{
}
- private static class WikipediaRecord implements GenericRecord
+ static class WikipediaRecord implements GenericRecord
{
private final Map<String, Object> event = new HashMap<>();
private final BiMap<Integer, String> indexes = HashBiMap.create(SCHEMA.getFields().size());
private int nextIndex = 0;
+ private final Schema schema;
+
+ public WikipediaRecord(Schema schema)
+ {
+ this.schema = schema;
+ }
@Override
public void put(String key, Object v)
@@ -125,7 +131,7 @@ public class AvroEventSerializer implements EventSerializer
@Override
public Schema getSchema()
{
- return SCHEMA;
+ return schema;
}
}
}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroSchemaRegistryEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroSchemaRegistryEventSerializer.java
new file mode 100644
index 0000000..dd1e82f
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroSchemaRegistryEventSerializer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.google.common.collect.ImmutableMap;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testing.IntegrationTestingConfig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class AvroSchemaRegistryEventSerializer extends AvroEventSerializer
+{
+ private static final int MAX_INITIALIZE_RETRIES = 10;
+ public static final String TYPE = "avro-schema-registry";
+
+ private final IntegrationTestingConfig config;
+ private final CachedSchemaRegistryClient client;
+ private int schemaId = -1;
+
+ private Schema fromRegistry;
+
+ @JsonCreator
+ public AvroSchemaRegistryEventSerializer(
+ @JacksonInject IntegrationTestingConfig config
+ )
+ {
+ this.config = config;
+ this.client = new CachedSchemaRegistryClient(
+ StringUtils.format("http://%s", config.getSchemaRegistryHost()),
+ Integer.MAX_VALUE,
+ ImmutableMap.of(
+ "basic.auth.credentials.source", "USER_INFO",
+ "basic.auth.user.info", "druid:diurd"
+ ),
+ ImmutableMap.of()
+ );
+
+ }
+
+ @Override
+ public void initialize(String topic)
+ {
+ try {
+ RetryUtils.retry(
+ () -> {
+ schemaId = client.register(topic, AvroEventSerializer.SCHEMA);
+ fromRegistry = client.getById(schemaId);
+ return 0;
+ },
+ (e) -> true,
+ MAX_INITIALIZE_RETRIES
+ );
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public byte[] serialize(List<Pair<String, Object>> event) throws IOException
+ {
+ final WikipediaRecord record = new WikipediaRecord(fromRegistry);
+ event.forEach(pair -> record.put(pair.lhs, pair.rhs));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ out.write(0x0);
+ out.write(ByteBuffer.allocate(4).putInt(schemaId).array());
+ BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
+ DatumWriter<Object> writer = new GenericDatumWriter<>(fromRegistry);
+ writer.write(record, encoder);
+ encoder.flush();
+ byte[] bytes = out.toByteArray();
+ out.close();
+ return bytes;
+ }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java
index 014d8c8..cad5acf 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java
@@ -42,9 +42,15 @@ import java.util.List;
@Type(name = JsonEventSerializer.TYPE, value = JsonEventSerializer.class),
@Type(name = CsvEventSerializer.TYPE, value = CsvEventSerializer.class),
@Type(name = DelimitedEventSerializer.TYPE, value = DelimitedEventSerializer.class),
- @Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class)
+ @Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class),
+ @Type(name = AvroSchemaRegistryEventSerializer.TYPE, value = AvroSchemaRegistryEventSerializer.class)
})
public interface EventSerializer extends Closeable
{
+ default void initialize(String topic)
+ {
+
+ }
+
byte[] serialize(List<Pair<String, Object>> event) throws IOException;
}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
index cf69ccd..bd9c1b8 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
@@ -61,6 +61,7 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
DateTime overrrideFirstEventTime
)
{
+ serializer.initialize(streamTopic);
// The idea here is that we will send [eventsPerSecond] events that will either use [nowFlooredToSecond]
// or the [overrrideFirstEventTime] as the primary timestamp.
// Having a fixed number of events that use the same timestamp will help in allowing us to determine if any events
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
index 204b6ef..5ea11e6 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
@@ -83,6 +83,7 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd
"%%TOPIC_VALUE%%",
streamName
);
+
if (AbstractStreamIndexingTest.INPUT_FORMAT.equals(parserType)) {
spec = StringUtils.replace(
spec,
@@ -116,6 +117,13 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd
"%%STREAM_PROPERTIES_KEY%%",
"consumerProperties"
);
+
+ spec = StringUtils.replace(
+ spec,
+ "%%SCHEMA_REGISTRY_HOST%%",
+ StringUtils.format("http://%s", config.getSchemaRegistryInternalHost())
+ );
+
return StringUtils.replace(
spec,
"%%STREAM_PROPERTIES_VALUE%%",
diff --git a/integration-tests/src/test/resources/stream/data/avro_schema_registry/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/avro_schema_registry/parser/input_row_parser.json
new file mode 100644
index 0000000..c48871a
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/avro_schema_registry/parser/input_row_parser.json
@@ -0,0 +1,21 @@
+{
+ "type": "avro_stream",
+ "avroBytesDecoder" : {
+ "type": "schema_registry",
+ "url": "%%SCHEMA_REGISTRY_HOST%%",
+ "config": {
+ "basic.auth.credentials.source": "USER_INFO",
+ "basic.auth.user.info": "druid:diurd"
+ }
+ },
+ "parseSpec": {
+ "format": "avro",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"]
+ }
+ }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/avro_schema_registry/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/avro_schema_registry/serializer/serializer.json
new file mode 100644
index 0000000..5251ade
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/avro_schema_registry/serializer/serializer.json
@@ -0,0 +1,3 @@
+{
+ "type": "avro-schema-registry"
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 157aec2..10ddf02 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1949,6 +1949,7 @@
<exclude>**/*.json</exclude>
<exclude>**/*.parq</exclude>
<exclude>**/*.parquet</exclude>
+ <exclude>**/docker/schema-registry/*</exclude>
<exclude>LICENSE</exclude>
<exclude>LICENSE.BINARY</exclude>
<exclude>NOTICE</exclude>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org