You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/05/22 01:56:56 UTC
[pulsar] branch master updated: [pulsar-broker]Support key value
schema compatibility checker (#4192)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a98c846 [pulsar-broker]Support key value schema compatibility checker (#4192)
a98c846 is described below
commit a98c846243760a780748d15209c12c5471f1ae77
Author: tuteng <eg...@gmail.com>
AuthorDate: Wed May 22 09:56:51 2019 +0800
[pulsar-broker]Support key value schema compatibility checker (#4192)
Motivation
The key/value types can be AVRO or JSON. Both AVRO and JSON supports schema evolution. We should add a key/value schema compatibility checker to allow schema evolution in broker side.
Modifications
Add KeyValueSchemaCompatibilityCheck class
Add Unit Test
Verifying this change
Unit Test Pass
---
.../schema/KeyValueSchemaCompatibilityCheck.java | 130 ++++++
.../service/schema/SchemaRegistryService.java | 2 +
.../KeyValueSchemaCompatibilityCheckTest.java | 500 +++++++++++++++++++++
.../integration/functions/PulsarFunctionsTest.java | 4 +-
.../integration/io/DebeziumMySqlSourceTester.java | 11 +-
5 files changed, 643 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
new file mode 100644
index 0000000..f9c9fc2
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import com.google.gson.Gson;
+import org.apache.avro.SchemaParseException;
+import org.apache.avro.SchemaValidationException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * {@link KeyValueSchemaCompatibilityCheck} for {@link SchemaType#KEY_VALUE}.
+ */
+public class KeyValueSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
+
+ private final Map<SchemaType, SchemaCompatibilityCheck> checkers;
+
+ public KeyValueSchemaCompatibilityCheck(Map<SchemaType, SchemaCompatibilityCheck> checkers) {
+ this.checkers = checkers;
+ }
+
+ private KeyValue<byte[], byte[]> splitKeyValueSchema(byte[] bytes) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ int keyLength = byteBuffer.getInt();
+ byte[] keySchema = new byte[keyLength];
+ byteBuffer.get(keySchema);
+
+ int valueLength = byteBuffer.getInt();
+ byte[] valueSchema = new byte[valueLength];
+ byteBuffer.get(valueSchema);
+ return new KeyValue<>(keySchema, valueSchema);
+ }
+
+ private SchemaType fetchSchemaType(Map<String, String> properties, String key) {
+ if (properties.get(key) != null) {
+ return SchemaType.valueOf(properties.get(key));
+ }
+ return SchemaType.BYTES;
+ }
+
+ private SchemaData fetchSchemaData(
+ byte[] keyValue, SchemaType schemaType, Gson schemaGson, Map<String, String> properties, String key) {
+ if (properties.get(key) != null) {
+ return SchemaData.builder().data(keyValue)
+ .type(schemaType)
+ .props(schemaGson.fromJson(properties.get(key), Map.class)).build();
+ }
+ return SchemaData.builder().data(keyValue)
+ .type(schemaType)
+ .props(Collections.emptyMap()).build();
+ }
+
+ private KeyValue<SchemaData, SchemaData> splitKeyValueSchemaData(SchemaData schemaData) {
+ KeyValue<byte[], byte[]> keyValue = this.splitKeyValueSchema(schemaData.getData());
+ Map<String, String> properties = schemaData.getProps();
+ SchemaType keyType = fetchSchemaType(properties, "key.schema.type");
+ SchemaType valueType = fetchSchemaType(properties, "value.schema.type");
+ Gson schemaGson = new Gson();
+ SchemaData keySchemaData = fetchSchemaData(
+ keyValue.getKey(), keyType, schemaGson, properties, "key.schema.properties");
+ SchemaData valueSchemaData = fetchSchemaData(
+ keyValue.getValue(), valueType, schemaGson, properties, "value.schema.properties");
+ return new KeyValue<>(keySchemaData, valueSchemaData);
+ }
+
+ @Override
+ public SchemaType getSchemaType() {
+ return SchemaType.KEY_VALUE;
+ }
+
+ @Override
+ public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
+ return isCompatible(Collections.singletonList(from), to, strategy);
+ }
+
+ @Override
+ public boolean isCompatible(Iterable<SchemaData> from, SchemaData to, SchemaCompatibilityStrategy strategy) {
+ if (strategy == SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE) {
+ return true;
+ }
+ if (to.getType() != SchemaType.KEY_VALUE) {
+ return false;
+ }
+ LinkedList<SchemaData> fromKeyList = new LinkedList<>();
+ LinkedList<SchemaData> fromValueList = new LinkedList<>();
+ KeyValue<SchemaData, SchemaData> fromKeyValue;
+ KeyValue<SchemaData, SchemaData> toKeyValue = splitKeyValueSchemaData(to);
+ SchemaType toKeyType = toKeyValue.getKey().getType();
+ SchemaType toValueType = toKeyValue.getValue().getType();
+
+ for (SchemaData schemaData : from) {
+ if (schemaData.getType() != SchemaType.KEY_VALUE) {
+ return false;
+ }
+ fromKeyValue = splitKeyValueSchemaData(schemaData);
+ if (fromKeyValue.getKey().getType() != toKeyType || fromKeyValue.getValue().getType() != toValueType) {
+ return false;
+ }
+ fromKeyList.addFirst(fromKeyValue.getKey());
+ fromValueList.addFirst(fromKeyValue.getValue());
+ }
+ SchemaCompatibilityCheck keyCheck = checkers.getOrDefault(toKeyType, SchemaCompatibilityCheck.DEFAULT);
+ SchemaCompatibilityCheck valueCheck = checkers.getOrDefault(toValueType, SchemaCompatibilityCheck.DEFAULT);
+ return keyCheck.isCompatible(fromKeyList, toKeyValue.getKey(), strategy)
+ && valueCheck.isCompatible(fromValueList, toKeyValue.getValue(), strategy);
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
index a74066b..baa57a4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -54,6 +54,8 @@ public interface SchemaRegistryService extends SchemaRegistry {
Map<SchemaType, SchemaCompatibilityCheck> checkers =
getCheckers(config.getSchemaRegistryCompatibilityCheckers());
+ checkers.put(SchemaType.KEY_VALUE, new KeyValueSchemaCompatibilityCheck(checkers));
+
schemaStorage.start();
return new SchemaRegistryServiceImpl(schemaStorage, checkers);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java
new file mode 100644
index 0000000..0ba8a1c
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import com.google.common.collect.Maps;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+public class KeyValueSchemaCompatibilityCheckTest {
+
+ private final Map<SchemaType, SchemaCompatibilityCheck> checkers = Maps.newHashMap();
+
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ private static class Foo {
+ private String field1;
+ private String field2;
+ private int field3;
+ private KeyValueSchemaCompatibilityCheckTest.Bar field4;
+ }
+
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ private static class Bar {
+ private boolean field1;
+ }
+
+ @BeforeClass
+ protected void setup() {
+ checkers.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck());
+ checkers.put(SchemaType.JSON, new JsonSchemaCompatibilityCheck());
+ checkers.put(SchemaType.KEY_VALUE, new KeyValueSchemaCompatibilityCheck(checkers));
+ }
+
+ @Test
+ public void testCheckKeyValueAvroCompatibilityFull() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+ properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+ }
+
+ @Test
+ public void testCheckKeyValueAvroInCompatibilityFull() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+ properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+ }
+
+ @Test
+ public void testCheckKeyValueAvroCompatibilityBackward() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+ properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
+ }
+
+ @Test
+ public void testCheckKeyValueAvroInCompatibilityBackward() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+ properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
+ }
+
+ @Test
+ public void testCheckKeyValueAvroCompatibilityForward() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+ properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+ }
+
+ @Test
+ public void testCheckKeyValueAvroInCompatibilityForward() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+ properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+ }
+
+ @Test
+ public void testCheckKeyValueJsonCompatibilityFull() {
+ JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+ properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+ }
+
+ @Test
+ public void testCheckKeyValueJsonInCompatibilityFull() {
+ JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+ properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+ }
+
+ @Test
+ public void testCheckKeyValueJsonCompatibilityBackward() {
+ JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+ properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
+ }
+
+ @Test
+ public void testCheckKeyValueJsonInCompatibilityBackWard() {
+ JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+ properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
+ }
+
+ @Test
+ public void testCheckKeyValueJsonCompatibilityForward() {
+ JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+ properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+ }
+
+ @Test
+ public void testCheckKeyValueJsonInCompatibilityForward() {
+ JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+ properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+ }
+
+ @Test
+ public void testCheckKeyAvroValueJsonCompatibilityFull() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+ properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+ }
+
+ @Test
+ public void testCheckKeyAvroValueJsonInCompatibilityFull() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+ properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+ }
+
+ @Test
+ public void testCheckKeyAvroValueJsonCompatibilityBackward() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+ properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
+ }
+
+ @Test
+ public void testCheckKeyAvroValueJsonInCompatibilityBackward() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+ properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
+ }
+
+ @Test
+ public void testCheckKeyAvroValueJsonCompatibilityForward() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+ properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+ }
+
+ @Test
+ public void testCheckKeyAvroValueJsonInCompatibilityForward() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+ properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+ }
+
+ @Test
+ public void testCheckKeyJsonValueAvroCompatibilityFull() {
+ JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+ properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+ }
+
+ @Test
+ public void testCheckKeyJsonValueAvroInCompatibilityFull() {
+ JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+ properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+ }
+
+ @Test
+ public void testCheckKeyJsonValueAvroCompatibilityBackward() {
+ JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+ properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
+ }
+
+ @Test
+ public void testCheckKeyJsonValueAvroInCompatibilityBackward() {
+ JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+ properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
+ }
+
+
+ @Test
+ public void testCheckKeyJsonValueAvroCompatibilityForward() {
+ JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+ properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+ }
+
+ @Test
+ public void testCheckKeyJsonValueAvroInCompatibilityForward() {
+ JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+ properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+ Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+ }
+
+ @Test
+ public void testCheckKeyJsonValueAvroKeyTypeInCompatibility() {
+ JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> fromProperties = Maps.newHashMap();
+ fromProperties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+ fromProperties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+ Map<String, String> toProperties = Maps.newHashMap();
+ toProperties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+ toProperties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(barSchema, barSchema).getSchemaInfo().getSchema()).props(toProperties).build();
+ Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+ }
+
+ @Test
+ public void testCheckKeyJsonValueAvroValueTypeInCompatibility() {
+ JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> fromProperties = Maps.newHashMap();
+ fromProperties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+ fromProperties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+ Map<String, String> toProperties = Maps.newHashMap();
+ toProperties.put("key.schema.type", String.valueOf(SchemaType.JSON));
+ toProperties.put("value.schema.type", String.valueOf(SchemaType.JSON));
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, fooSchema).getSchemaInfo().getSchema()).props(toProperties).build();
+ Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
+ }
+
+ @Test
+ public void testCheckPropertiesNullTypeCompatibility() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> fromProperties = Maps.newHashMap();
+ fromProperties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+ fromProperties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+ fromProperties.put("key.schema.properties", null);
+ fromProperties.put("value.schema.properties", null);
+ Map<String, String> toProperties = Maps.newHashMap();
+ toProperties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
+ toProperties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
+ toProperties.put("key.schema.properties", null);
+ toProperties.put("value.schema.properties", null);
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(toProperties).build();
+ Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+ }
+
+ @Test
+ public void testCheckSchemaTypeNullCompatibility() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ Map<String, String> fromProperties = Maps.newHashMap();
+ fromProperties.put("key.schema.type", null);
+ fromProperties.put("value.schema.type", null);
+ Map<String, String> toProperties = Maps.newHashMap();
+ toProperties.put("key.schema.type", null);
+ toProperties.put("value.schema.type", null);
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(toProperties).build();
+ Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
+ }
+
+ @Test
+ public void testCheckSchemaTypeAlwaysCompatibility() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ StringSchema stringSchema = new StringSchema();
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.STRING)
+ .data(stringSchema.getSchemaInfo().getSchema()).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).build();
+ Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE));
+ }
+
+ @Test
+ public void testCheckSchemaTypeOtherCompatibility() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ StringSchema stringSchema = new StringSchema();
+ SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.STRING)
+ .data(stringSchema.getSchemaInfo().getSchema()).build();
+ SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
+ .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).build();
+ Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE));
+ }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index c31329f..5393011 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -42,12 +42,14 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.policies.data.SourceStatus;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
import org.apache.pulsar.functions.api.examples.serde.CustomObject;
import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
@@ -1563,7 +1565,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
.build();
@Cleanup
- Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ Consumer<KeyValue<byte[], byte[]>> consumer = client.newConsumer(KeyValueSchema.kvBytes())
.topic(consumeTopicName)
.subscriptionName("debezium-source-tester")
.subscriptionType(SubscriptionType.Exclusive)
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
index ffecbc0..2d1b4b5 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
@@ -25,6 +25,8 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.impl.schema.ByteSchema;
+import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
@@ -84,13 +86,16 @@ public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContain
return null;
}
- public void validateSourceResult(Consumer<String> consumer, int number) throws Exception {
+ public void validateSourceResult(Consumer<KeyValue<byte[], byte[]>> consumer, int number) throws Exception {
int recordsNumber = 0;
- Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+ Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, TimeUnit.SECONDS);
while(msg != null) {
recordsNumber ++;
log.info("Received message: {}.", msg.getValue());
- Assert.assertTrue(msg.getValue().contains("dbserver1.inventory.products"));
+ String key = new String(msg.getValue().getKey());
+ String value = new String(msg.getValue().getValue());
+ Assert.assertTrue(key.contains("dbserver1.inventory.products.Key"));
+ Assert.assertTrue(value.contains("dbserver1.inventory.products.Value"));
consumer.acknowledge(msg);
msg = consumer.receive(1, TimeUnit.SECONDS);
}