You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/19 15:25:39 UTC
[pulsar] branch master updated: revise the schema default type not
null (#3752)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1a1c557 revise the schema default type not null (#3752)
1a1c557 is described below
commit 1a1c557b27bdc4e48fcd4a2da6f71e0d78873697
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Mar 19 23:25:34 2019 +0800
revise the schema default type not null (#3752)
### Motivation
Fix #3741
### Modifications
Support define not not allow null field in schema
### Verifying this change
Add not allow null field schema verify
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (yes)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)
---
.../broker/service/schema/ClientGetSchemaTest.java | 8 +
.../schema/JsonSchemaCompatibilityCheckTest.java | 5 +-
.../api/SimpleTypedProducerConsumerTest.java | 28 ++--
.../java/org/apache/pulsar/client/api/Schema.java | 49 +++---
.../pulsar/client/api/schema/SchemaDefinition.java | 64 ++++++++
.../client/api/schema/SchemaDefinitionBuilder.java | 81 ++++++++++
.../client/internal/DefaultImplementation.java | 37 ++---
.../pulsar/client/impl/schema/AvroSchema.java | 34 ++---
.../pulsar/client/impl/schema/JSONSchema.java | 36 ++---
.../impl/schema/SchemaDefinitionBuilderImpl.java | 95 ++++++++++++
.../client/impl/schema/SchemaDefinitionImpl.java | 98 ++++++++++++
.../pulsar/client/impl/schema/StructSchema.java | 10 +-
.../pulsar/client/impl/schema/AvroSchemaTest.java | 79 ++++++++--
.../pulsar/client/impl/schema/JSONSchemaTest.java | 167 +++++++++++++++++++--
.../client/impl/schema/KeyValueSchemaTest.java | 164 +++++++++++++++++---
.../client/impl/schema/SchemaBuilderTest.java | 7 +
.../pulsar/client/impl/schema/SchemaTestUtils.java | 40 +++--
.../impl/schema/generic/GenericSchemaImplTest.java | 4 +-
.../MultiVersionGenericSchemaProviderTest.java | 4 +-
.../tutorial/SampleAsyncProducerWithSchema.java | 4 +-
.../client/tutorial/SampleConsumerWithSchema.java | 4 +-
.../pulsar/functions/source/TopicSchema.java | 5 +-
.../io/hbase/sink/HbaseGenericRecordSinkTest.java | 3 +-
.../org/apache/pulsar/io/jdbc/JdbcSinkTest.java | 3 +-
.../pulsar/sql/presto/TestPulsarConnector.java | 27 ++--
.../tests/integration/io/JdbcSinkTester.java | 3 +-
26 files changed, 868 insertions(+), 191 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
index 7055492..9fcd231 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -43,6 +44,8 @@ public class ClientGetSchemaTest extends ProducerConsumerBase {
private static final String topicString = "my-property/my-ns/topic-string";
private static final String topicJson = "my-property/my-ns/topic-json";
private static final String topicAvro = "my-property/my-ns/topic-avro";
+ private static final String topicJsonNotNull = "my-property/my-ns/topic-json-not-null";
+ private static final String topicAvroNotNull = "my-property/my-ns/topic-avro-not-null";
List<Producer<?>> producers = new ArrayList<>();
@@ -62,6 +65,11 @@ public class ClientGetSchemaTest extends ProducerConsumerBase {
producers.add(pulsarClient.newProducer(Schema.STRING).topic(topicString).create());
producers.add(pulsarClient.newProducer(Schema.AVRO(MyClass.class)).topic(topicAvro).create());
producers.add(pulsarClient.newProducer(Schema.JSON(MyClass.class)).topic(topicJson).create());
+ producers.add(pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.<MyClass>builder().withPojo(MyClass.class).build())).topic(topicAvro).create());
+ producers.add(pulsarClient.newProducer(Schema.JSON(SchemaDefinition.<MyClass>builder().withPojo(MyClass.class).build())).topic(topicJson).create());
+ producers.add(pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.<MyClass>builder().withPojo(MyClass.class).withAlwaysAllowNull(false).build())).topic(topicAvroNotNull).create());
+ producers.add(pulsarClient.newProducer(Schema.JSON(SchemaDefinition.<MyClass>builder().withPojo(MyClass.class).withAlwaysAllowNull(false).build())).topic(topicJsonNotNull).create());
+
}
@AfterClass
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
index 17b39b7..a785927 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
@@ -33,6 +33,7 @@ import lombok.ToString;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -51,11 +52,11 @@ public class JsonSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilit
public void testJsonSchemaBackwardsCompatibility() throws JsonProcessingException {
SchemaData from = SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
- SchemaData to = SchemaData.builder().data(JSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
+ SchemaData to = SchemaData.builder().data(JSONSchema.of(SchemaDefinition.builder().withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
JsonSchemaCompatibilityCheck jsonSchemaCompatibilityCheck = new JsonSchemaCompatibilityCheck();
Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to, SchemaCompatibilityStrategy.FULL));
- from = SchemaData.builder().data(JSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
+ from = SchemaData.builder().data(JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
to = SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to, SchemaCompatibilityStrategy.FULL));
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index c9e919e..050db39 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
@@ -63,7 +64,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
JSONSchema<JsonEncodedPojo> jsonSchema =
- JSONSchema.of(JsonEncodedPojo.class);
+ JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build());
Consumer<JsonEncodedPojo> consumer = pulsarClient
.newConsumer(jsonSchema)
@@ -108,7 +109,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
JSONSchema<JsonEncodedPojo> jsonSchema =
- JSONSchema.of(JsonEncodedPojo.class);
+ JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build());
pulsar.getSchemaRegistryService()
.putSchemaIfAbsent("my-property/my-ns/my-topic1",
@@ -166,7 +167,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
).get();
Consumer<JsonEncodedPojo> consumer = pulsarClient
- .newConsumer(JSONSchema.of(JsonEncodedPojo.class))
+ .newConsumer(JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build()))
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("my-subscriber-name")
.subscribe();
@@ -194,7 +195,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
).get();
Producer<JsonEncodedPojo> producer = pulsarClient
- .newProducer(JSONSchema.of(JsonEncodedPojo.class))
+ .newProducer(JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build()))
.topic("persistent://my-property/use/my-ns/my-topic1")
.create();
@@ -273,7 +274,9 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
).get();
Consumer<org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong> consumer = pulsarClient
- .newConsumer(AvroSchema.of(org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong.class))
+ .newConsumer(AvroSchema.of
+ (SchemaDefinition.<org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong>builder().
+ withPojo(org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong.class).build()))
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("my-subscriber-name")
.subscribe();
@@ -286,7 +289,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
AvroSchema<AvroEncodedPojo> avroSchema =
- AvroSchema.of(AvroEncodedPojo.class);
+ AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
+ withPojo(AvroEncodedPojo.class).build());
Consumer<AvroEncodedPojo> consumer = pulsarClient
.newConsumer(avroSchema)
@@ -355,7 +359,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
).get();
Consumer<AvroEncodedPojo> consumer = pulsarClient
- .newConsumer(AvroSchema.of(AvroEncodedPojo.class))
+ .newConsumer(AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
+ withPojo(AvroEncodedPojo.class).build()))
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("my-subscriber-name")
.subscribe();
@@ -454,7 +459,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
AvroSchema<AvroEncodedPojo> avroSchema =
- AvroSchema.of(AvroEncodedPojo.class);
+ AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
+ withPojo(AvroEncodedPojo.class).build());
Producer<AvroEncodedPojo> producer = pulsarClient
.newProducer(avroSchema)
@@ -502,7 +508,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
AvroSchema<AvroEncodedPojo> avroSchema =
- AvroSchema.of(AvroEncodedPojo.class);
+ AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
+ withPojo(AvroEncodedPojo.class).build());
Producer<AvroEncodedPojo> producer = pulsarClient
.newProducer(avroSchema)
@@ -548,7 +555,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
AvroSchema<AvroEncodedPojo> avroSchema =
- AvroSchema.of(AvroEncodedPojo.class);
+ AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
+ withPojo(AvroEncodedPojo.class).build());
try (Producer<AvroEncodedPojo> producer = pulsarClient
.newProducer(avroSchema)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index e17fb06..d0bff30 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -19,10 +19,10 @@
package org.apache.pulsar.client.api;
import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Map;
+
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -169,58 +169,43 @@ public interface Schema<T> {
}
/**
- * Create a Avro schema type by extracting the fields of the specified class.
- *
- * @param clazz the POJO class to be used to extract the Avro schema
- * @return a Schema instance
- */
- static <T> Schema<T> AVRO(Class<T> clazz) {
- return DefaultImplementation.newAvroSchema(clazz);
- }
-
- /**
- * Create a Avro schema type using the provided avro schema definition.
+ * Create a Avro schema type by default configuration of the class
*
- * @param schemaDefinition avro schema definition
+ * @param pojo the POJO class to be used to extract the Avro schema
* @return a Schema instance
*/
- static <T> Schema<T> AVRO(String schemaDefinition) {
- return AVRO(schemaDefinition, Collections.emptyMap());
+ static <T> Schema<T> AVRO(Class<T> pojo) {
+ return DefaultImplementation.newAvroSchema(SchemaDefinition.builder().withPojo(pojo).build());
}
/**
- * Create a Avro schema type using the provided avro schema definition.
+ * Create a Avro schema type with schema definition
*
- * @param schemaDefinition avro schema definition
- * @param properties pulsar schema properties
+ * @param schemaDefinition the definition of the schema
* @return a Schema instance
*/
- static <T> Schema<T> AVRO(String schemaDefinition, Map<String, String> properties) {
- return DefaultImplementation.newAvroSchema(schemaDefinition, properties);
+ static <T> Schema<T> AVRO(SchemaDefinition<T> schemaDefinition) {
+ return DefaultImplementation.newAvroSchema(schemaDefinition);
}
/**
* Create a JSON schema type by extracting the fields of the specified class.
*
- * @param clazz the POJO class to be used to extract the JSON schema
+ * @param pojo the POJO class to be used to extract the JSON schema
* @return a Schema instance
*/
- static <T> Schema<T> JSON(Class<T> clazz) {
- return DefaultImplementation.newJSONSchema(clazz);
+ static <T> Schema<T> JSON(Class<T> pojo) {
+ return DefaultImplementation.newJSONSchema(SchemaDefinition.builder().withPojo(pojo).build());
}
/**
- * Create a JSON schema type by extracting the fields of the specified class.
+ * Create a JSON schema type with schema definition
*
- * @param clazz the POJO class to be used to extract the JSON schema
- * @param schemaDefinition schema definition json string (using avro schema syntax)
- * @param properties pulsar schema properties
+ * @param schemaDefinition the definition of the schema
* @return a Schema instance
*/
- static <T> Schema<T> JSON(Class<T> clazz,
- String schemaDefinition,
- Map<String, String> properties) {
- return DefaultImplementation.newJSONSchema(clazz, schemaDefinition, properties);
+ static <T> Schema<T> JSON(SchemaDefinition schemaDefinition) {
+ return DefaultImplementation.newJSONSchema(schemaDefinition);
}
/**
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
new file mode 100644
index 0000000..daf90df
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;
+
+import org.apache.pulsar.client.internal.DefaultImplementation;
+
+import java.util.Map;
+
+
+public interface SchemaDefinition<T> {
+
+ /**
+ * Get a new builder instance that can used to configure and build a {@link SchemaDefinition} instance.
+ *
+ * @return the {@link SchemaDefinition}
+ */
+ static <T> SchemaDefinitionBuilder<T> builder() {
+ return DefaultImplementation.newSchemaDefinitionBuilder();
+ }
+
+ /**
+ * get schema whether always allow null or not
+ *
+ * @return schema always null or not
+ */
+ public boolean getAlwaysAllowNull();
+
+ /**
+ * Get schema class
+ *
+ * @return schema class
+ */
+ public Map<String, String> getProperties();
+
+ /**
+ * Get json schema definition
+ *
+ * @return schema class
+ */
+ public String getJsonDef();
+
+ /**
+ * Get pojo schema definition
+ *
+ * @return pojo schema
+ */
+ public Class<T> getPojo();
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
new file mode 100644
index 0000000..77bb363
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;
+
+
+
+import java.util.Map;
+
+/**
+ * Builder to build schema definition {@link SchemaDefinition}.
+ */
+public interface SchemaDefinitionBuilder<T> {
+
+ /**
+ * Set schema whether always allow null or not
+ *
+ * @param alwaysAllowNull definition null or not
+ * @return schema definition builder
+ */
+ SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull);
+
+ /**
+ * Set schema info properties
+ *
+ * @param properties schema info properties
+ * @return schema definition builder
+ */
+ SchemaDefinitionBuilder<T> withProperties(Map<String, String> properties);
+
+ /**
+ * Set schema info properties
+ *
+ * @param key property key
+ * @param value property value
+ *
+ * @return record schema definition
+ */
+ SchemaDefinitionBuilder<T> addProperty(String key, String value);
+
+ /**
+ * Set schema of pojo definition
+ *
+ * @param pojo pojo schema definition
+ *
+ * @return record schema definition
+ */
+ SchemaDefinitionBuilder<T> withPojo(Class pojo);
+
+ /**
+ * Set schema of json definition
+ *
+ * @param jsonDefinition json schema definition
+ *
+ * @return record schema definition
+ */
+ SchemaDefinitionBuilder<T> withJsonDef(String jsonDefinition);
+
+ /**
+ * Build the schema definition.
+ *
+ * @return the schema definition.
+ */
+ SchemaDefinition<T> build();
+
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index abd368e..44dbc10 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -37,9 +37,7 @@ import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericSchema;
-import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.*;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -71,6 +69,13 @@ public class DefaultImplementation {
private static final Constructor<Authentication> AUTHENTICATION_TLS_String_String = getConstructor(
"org.apache.pulsar.client.impl.auth.AuthenticationTls", String.class, String.class);
+ private static final Constructor<SchemaDefinitionBuilder> SCHEMA_DEFINITION_BUILDER_CONSTRUCTOR = getConstructor(
+ "org.apache.pulsar.client.impl.schema.SchemaDefinitionBuilderImpl");
+
+ public static <T> SchemaDefinitionBuilder<T> newSchemaDefinitionBuilder() {
+ return catchExceptions(() -> (SchemaDefinitionBuilder<T>)SCHEMA_DEFINITION_BUILDER_CONSTRUCTOR.newInstance());
+ }
+
public static ClientBuilder newClientBuilder() {
return catchExceptions(() -> CLIENT_BUILDER_IMPL.newInstance());
}
@@ -182,16 +187,10 @@ public class DefaultImplementation {
.newInstance());
}
- public static <T> Schema<T> newAvroSchema(Class<T> clazz) {
+ public static <T> Schema<T> newAvroSchema(SchemaDefinition schemaDefinition) {
return catchExceptions(
- () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.AvroSchema", "of", Class.class)
- .invoke(null, clazz));
- }
-
- public static <T> Schema<T> newAvroSchema(String schemaDefinition, Map<String, String> properties) {
- return catchExceptions(
- () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.AvroSchema", "of", String.class, Map.class)
- .invoke(null, schemaDefinition, properties));
+ () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.AvroSchema", "of", SchemaDefinition.class)
+ .invoke(null,schemaDefinition));
}
public static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(Class<T> clazz) {
@@ -200,18 +199,10 @@ public class DefaultImplementation {
.invoke(null, clazz));
}
- public static <T> Schema<T> newJSONSchema(Class<T> clazz) {
- return catchExceptions(
- () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.JSONSchema", "of", Class.class)
- .invoke(null, clazz));
- }
-
- public static <T> Schema<T> newJSONSchema(Class<T> clazz,
- String schemaDefinition,
- Map<String, String> properties) {
+ public static <T> Schema<T> newJSONSchema(SchemaDefinition schemaDefinition) {
return catchExceptions(
- () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.JSONSchema", "of", Class.class, String.class, Map.class)
- .invoke(null, clazz, schemaDefinition, properties));
+ () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.JSONSchema", "of", SchemaDefinition.class)
+ .invoke(null, schemaDefinition));
}
public static Schema<GenericRecord> newAutoConsumeSchema() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index c9726aa..a00112c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -26,11 +26,12 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.util.Collections;
import java.util.Map;
/**
@@ -48,12 +49,11 @@ public class AvroSchema<T> extends StructSchema<T> {
new ThreadLocal<>();
private AvroSchema(org.apache.avro.Schema schema,
- Map<String, String> properties) {
+ SchemaDefinition schemaDefinition) {
super(
SchemaType.AVRO,
schema,
- properties);
-
+ schemaDefinition.getProperties());
this.byteArrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, this.encoder);
this.datumWriter = new ReflectDatumWriter<>(this.schema);
@@ -87,23 +87,23 @@ public class AvroSchema<T> extends StructSchema<T> {
}
}
- public static <T> AvroSchema<T> of(Class<T> pojo) {
- return new AvroSchema<>(createAvroSchema(pojo), Collections.emptyMap());
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return this.schemaInfo;
}
- public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String> properties) {
- return new AvroSchema<>(createAvroSchema(pojo), properties);
+ public static <T> AvroSchema<T> of(SchemaDefinition<T> schemaDefinition) {
+ return schemaDefinition.getJsonDef() == null ?
+ new AvroSchema<>(createAvroSchema(schemaDefinition), schemaDefinition) : new AvroSchema<>(parseAvroSchema(schemaDefinition.getJsonDef()), schemaDefinition);
}
- /**
- * Create an Avro schema based on provided schema definition.
- *
- * @param schemaDefinition avro schema definition
- * @param properties schema properties
- * @return avro schema instance
- */
- public static <T> AvroSchema<T> of(String schemaDefinition, Map<String, String> properties) {
- return new AvroSchema<>(parseAvroSchema(schemaDefinition), properties);
+ public static <T> AvroSchema<T> of(Class<T> pojo) {
+ return AvroSchema.of(SchemaDefinition.<T>builder().withPojo(pojo).build());
+ }
+
+ public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String> properties) {
+ SchemaDefinition<T> schemaDefinition = SchemaDefinition.<T>builder().withPojo(pojo).withProperties(properties).build();
+ return new AvroSchema<>(createAvroSchema(schemaDefinition), schemaDefinition);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index b915ff2..629b769 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -26,11 +26,11 @@ import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.io.IOException;
-import java.util.Collections;
import java.util.Map;
/**
@@ -38,7 +38,6 @@ import java.util.Map;
*/
@Slf4j
public class JSONSchema<T> extends StructSchema<T> {
-
// Cannot use org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal() because it does not
// return shaded version of object mapper
private static final ThreadLocal<ObjectMapper> JSON_MAPPER = ThreadLocal.withInitial(() -> {
@@ -51,14 +50,13 @@ public class JSONSchema<T> extends StructSchema<T> {
private final Class<T> pojo;
private final ObjectMapper objectMapper;
- private JSONSchema(Class<T> pojo,
- org.apache.avro.Schema schema,
- Map<String, String> properties) {
+ private JSONSchema(org.apache.avro.Schema schema,
+ SchemaDefinition<T> schemaDefinition) {
super(
SchemaType.JSON,
schema,
- properties);
- this.pojo = pojo;
+ schemaDefinition.getProperties());
+ this.pojo = schemaDefinition.getPojo();
this.objectMapper = JSON_MAPPER.get();
}
@@ -89,6 +87,7 @@ public class JSONSchema<T> extends StructSchema<T> {
* Implemented for backwards compatibility reasons
* since the original schema generated by JSONSchema was based off the json schema standard
* since then we have standardized on Avro
+ *
* @return
*/
public SchemaInfo getBackwardsCompatibleJsonSchemaInfo() {
@@ -108,25 +107,18 @@ public class JSONSchema<T> extends StructSchema<T> {
return backwardsCompatibleSchemaInfo;
}
+ public static <T> JSONSchema<T> of(SchemaDefinition<T> schemaDefinition) {
+ String jsonDef = schemaDefinition.getJsonDef();
+ return jsonDef == null ? new JSONSchema<>(createAvroSchema(schemaDefinition), schemaDefinition) :
+ new JSONSchema<>(parseAvroSchema(jsonDef), schemaDefinition);
+ }
+
public static <T> JSONSchema<T> of(Class<T> pojo) {
- return new JSONSchema<>(pojo, createAvroSchema(pojo), Collections.emptyMap());
+ return JSONSchema.of(SchemaDefinition.<T>builder().withPojo(pojo).build());
}
public static <T> JSONSchema<T> of(Class<T> pojo, Map<String, String> properties) {
- return new JSONSchema<>(pojo, createAvroSchema(pojo), properties);
+ return JSONSchema.of(SchemaDefinition.<T>builder().withPojo(pojo).withProperties(properties).build());
}
- /**
- * Create an json schema based on provided schema definition.
- *
- * @param pojo pojo class
- * @param schemaDefinition avro schema definition
- * @param properties schema properties
- * @return avro schema instance
- */
- public static <T> JSONSchema<T> of(Class<T> pojo,
- String schemaDefinition,
- Map<String, String> properties) {
- return new JSONSchema<>(pojo, parseAvroSchema(schemaDefinition), properties);
- }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
new file mode 100644
index 0000000..2db6cf4
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.api.schema.SchemaDefinitionBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Builder to build {@link org.apache.pulsar.client.api.schema.GenericRecord}.
+ */
+public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T> {
+
+ public static final String ALWAYS_ALLOW_NULL = "__alwaysAllowNull";
+
+ /**
+ * the schema definition class
+ */
+ private Class<T> clazz;
+ /**
+ * The flag of schema type always allow null
+ *
+ * If it's true, will make all of the pojo field generate schema
+ * define default can be null,false default can't be null, but it's
+ * false you can define the field by yourself by the annotation@Nullable
+ *
+ */
+ private boolean alwaysAllowNull = true;
+
+ /**
+ * The schema info properties
+ */
+ private Map<String, String> properties = new HashMap<>();
+
+ /**
+ * The json schema definition
+ */
+ private String jsonDef;
+
+ @Override
+ public SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull) {
+ this.alwaysAllowNull = alwaysAllowNull;
+ return this;
+ }
+
+ @Override
+ public SchemaDefinitionBuilder<T> addProperty(String key, String value) {
+ this.properties.put(key, value);
+ return this;
+ }
+
+ @Override
+ public SchemaDefinitionBuilder<T> withPojo(Class clazz) {
+ this.clazz = clazz;
+ return this;
+ }
+
+ @Override
+ public SchemaDefinitionBuilder<T> withJsonDef(String jsonDef) {
+ this.jsonDef = jsonDef;
+ return this;
+ }
+
+
+ @Override
+ public SchemaDefinitionBuilder<T> withProperties(Map<String,String> properties) {
+ this.properties = properties;
+ return this;
+ }
+
+ @Override
+ public SchemaDefinition<T> build() {
+ properties.put(ALWAYS_ALLOW_NULL, this.alwaysAllowNull ? "true" : "false");
+ return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties);
+
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
new file mode 100644
index 0000000..04f1a24
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A json schema definition
+ * {@link org.apache.pulsar.client.api.schema.SchemaDefinition} for the json schema definition.
+ */
+public class SchemaDefinitionImpl<T> implements SchemaDefinition<T>{
+
+ /**
+ * the schema definition class
+ */
+ private Class<T> pojo;
+ /**
+ * The flag of schema type always allow null
+ *
+ * If it's true, will make all of the pojo field generate schema
+ * define default can be null,false default can't be null, but it's
+ * false you can define the field by yourself by the annotation@Nullable
+ *
+ */
+ private boolean alwaysAllowNull;
+
+ private Map<String, String> properties;
+
+ private String jsonDef;
+
+ public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String,String> properties) {
+ this.alwaysAllowNull = alwaysAllowNull;
+ this.properties = properties;
+ this.jsonDef = jsonDef;
+ this.pojo = pojo;
+ }
+ /**
+ * get schema whether always allow null or not
+ *
+ * @return schema always null or not
+ */
+ public boolean getAlwaysAllowNull() {
+
+ return alwaysAllowNull;
+ }
+
+ /**
+ * Get json schema definition
+ *
+ * @return schema class
+ */
+ public String getJsonDef() {
+
+ return jsonDef;
+ }
+ /**
+ * Get pojo schema definition
+ *
+ * @return pojo class
+ */
+ @Override
+ public Class<T> getPojo() {
+ return pojo;
+ }
+
+ /**
+ * Get schema class
+ *
+ * @return schema class
+ */
+ public Map<String, String> getProperties() {
+
+ return properties;
+ }
+
+
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index 33ce9de..31156d4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.avro.Schema.Parser;
import org.apache.avro.reflect.ReflectData;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -62,13 +63,14 @@ abstract class StructSchema<T> implements Schema<T> {
return this.schemaInfo;
}
- protected static <T> org.apache.avro.Schema createAvroSchema(Class<T> pojo) {
- return ReflectData.AllowNull.get().getSchema(pojo);
+ protected static org.apache.avro.Schema createAvroSchema(SchemaDefinition schemaDefinition) {
+ Class pojo = schemaDefinition.getPojo();
+ return schemaDefinition.getAlwaysAllowNull() ? ReflectData.AllowNull.get().getSchema(pojo) : ReflectData.get().getSchema(pojo);
}
- protected static org.apache.avro.Schema parseAvroSchema(String definition) {
+ protected static org.apache.avro.Schema parseAvroSchema(String jsonDef) {
Parser parser = new Parser();
- return parser.parse(definition);
+ return parser.parse(jsonDef);
}
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index 203c226..bbd753b 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -20,29 +20,34 @@ package org.apache.pulsar.client.impl.schema;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.FOO_FIELDS;
-import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON;
+import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_AVRO_NOT_ALLOW_NULL;
+import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_AVRO_ALLOW_NULL;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.fail;
import java.util.Arrays;
-import java.util.Collections;
+
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.avro.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.avro.SchemaValidationException;
import org.apache.avro.SchemaValidator;
import org.apache.avro.SchemaValidatorBuilder;
import org.apache.avro.reflect.AvroDefault;
import org.apache.avro.reflect.Nullable;
import org.apache.avro.reflect.ReflectData;
-import org.apache.pulsar.client.api.SchemaSerializationException;
+
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.Test;
+
@Slf4j
public class AvroSchemaTest {
@@ -88,7 +93,7 @@ public class AvroSchemaTest {
// expected
}
- AvroSchema<StructWithAnnotations> schema3 = AvroSchema.of(schemaDef1, Collections.emptyMap());
+ AvroSchema<StructWithAnnotations> schema3 = AvroSchema.of(SchemaDefinition.<StructWithAnnotations>builder().withJsonDef(schemaDef1).build());
String schemaDef3 = new String(schema3.getSchemaInfo().getSchema(), UTF_8);
assertEquals(schemaDef1, schemaDef3);
assertNotEquals(schemaDef2, schemaDef3);
@@ -108,12 +113,12 @@ public class AvroSchemaTest {
}
@Test
- public void testSchema() {
- AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class);
+ public void testNotAllowNullSchema() {
+ AvroSchema<Foo> avroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
assertEquals(avroSchema.getSchemaInfo().getType(), SchemaType.AVRO);
Schema.Parser parser = new Schema.Parser();
String schemaJson = new String(avroSchema.getSchemaInfo().getSchema());
- assertEquals(schemaJson, SCHEMA_JSON);
+ assertEquals(schemaJson, SCHEMA_AVRO_NOT_ALLOW_NULL);
Schema schema = parser.parse(schemaJson);
for (String fieldName : FOO_FIELDS) {
@@ -123,12 +128,66 @@ public class AvroSchemaTest {
if (field.name().equals("field4")) {
Assert.assertNotNull(field.schema().getTypes().get(1).getField("field1"));
}
+ if (field.name().equals("fieldUnableNull")) {
+ Assert.assertNotNull(field.schema().getType());
+ }
}
}
@Test
- public void testEncodeAndDecode() {
- AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null);
+ public void testAllowNullSchema() {
+ AvroSchema<Foo> avroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ assertEquals(avroSchema.getSchemaInfo().getType(), SchemaType.AVRO);
+ Schema.Parser parser = new Schema.Parser();
+ String schemaJson = new String(avroSchema.getSchemaInfo().getSchema());
+ assertEquals(schemaJson, SCHEMA_AVRO_ALLOW_NULL);
+ Schema schema = parser.parse(schemaJson);
+
+ for (String fieldName : FOO_FIELDS) {
+ Schema.Field field = schema.getField(fieldName);
+ Assert.assertNotNull(field);
+
+ if (field.name().equals("field4")) {
+ Assert.assertNotNull(field.schema().getTypes().get(1).getField("field1"));
+ }
+ if (field.name().equals("fieldUnableNull")) {
+ Assert.assertNotNull(field.schema().getType());
+ }
+ }
+ }
+
+ @Test
+ public void testNotAllowNullEncodeAndDecode() {
+ AvroSchema<Foo> avroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
+
+ Foo foo1 = new Foo();
+ foo1.setField1("foo1");
+ foo1.setField2("bar1");
+ foo1.setField4(new Bar());
+ foo1.setFieldUnableNull("notNull");
+
+ Foo foo2 = new Foo();
+ foo2.setField1("foo2");
+ foo2.setField2("bar2");
+
+ byte[] bytes1 = avroSchema.encode(foo1);
+ Foo object1 = avroSchema.decode(bytes1);
+ Assert.assertTrue(bytes1.length > 0);
+ assertEquals(object1, foo1);
+
+ try {
+
+ avroSchema.encode(foo2);
+
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof SchemaSerializationException);
+ }
+
+ }
+
+ @Test
+ public void testAllowNullEncodeAndDecode() {
+ AvroSchema<Foo> avroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Foo foo1 = new Foo();
foo1.setField1("foo1");
@@ -150,6 +209,8 @@ public class AvroSchemaTest {
assertEquals(object1, foo1);
assertEquals(object2, foo2);
+
}
+
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
index 9184faa..5efb82b 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
@@ -24,7 +24,7 @@ import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
-import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.DerivedFoo;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
@@ -35,18 +35,20 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.FOO_FIELDS;
-import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON;
+import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_NOT_ALLOW_NULL;
+import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_ALLOW_NULL;
+import static org.testng.Assert.assertEquals;
@Slf4j
public class JSONSchemaTest {
@Test
- public void testSchema() {
- JSONSchema<Foo> jsonSchema = JSONSchema.of(Foo.class);
+ public void testNotAllowNullSchema() {
+ JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
Assert.assertEquals(jsonSchema.getSchemaInfo().getType(), SchemaType.JSON);
Schema.Parser parser = new Schema.Parser();
String schemaJson = new String(jsonSchema.getSchemaInfo().getSchema());
- Assert.assertEquals(schemaJson, SCHEMA_JSON);
+ Assert.assertEquals(schemaJson, SCHEMA_JSON_NOT_ALLOW_NULL);
Schema schema = parser.parse(schemaJson);
for (String fieldName : FOO_FIELDS) {
@@ -56,12 +58,37 @@ public class JSONSchemaTest {
if (field.name().equals("field4")) {
Assert.assertNotNull(field.schema().getTypes().get(1).getField("field1"));
}
+ if (field.name().equals("fieldUnableNull")) {
+ Assert.assertNotNull(field.schema().getType());
+ }
}
}
@Test
- public void testEncodeAndDecode() {
- JSONSchema<Foo> jsonSchema = JSONSchema.of(Foo.class, null);
+ public void testAllowNullSchema() {
+ JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ Assert.assertEquals(jsonSchema.getSchemaInfo().getType(), SchemaType.JSON);
+ Schema.Parser parser = new Schema.Parser();
+ String schemaJson = new String(jsonSchema.getSchemaInfo().getSchema());
+ Assert.assertEquals(schemaJson, SCHEMA_JSON_ALLOW_NULL);
+ Schema schema = parser.parse(schemaJson);
+
+ for (String fieldName : FOO_FIELDS) {
+ Schema.Field field = schema.getField(fieldName);
+ Assert.assertNotNull(field);
+
+ if (field.name().equals("field4")) {
+ Assert.assertNotNull(field.schema().getTypes().get(1).getField("field1"));
+ }
+ if (field.name().equals("fieldUnableNull")) {
+ Assert.assertNotNull(field.schema().getType());
+ }
+ }
+ }
+
+ @Test
+ public void testAllowNullEncodeAndDecode() {
+ JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Bar bar = new Bar();
bar.setField1(true);
@@ -90,9 +117,65 @@ public class JSONSchemaTest {
}
@Test
- public void testNestedClasses() {
- JSONSchema<NestedBar> jsonSchema = JSONSchema.of(NestedBar.class, null);
- JSONSchema<NestedBarList> listJsonSchema = JSONSchema.of(NestedBarList.class, null);
+ public void testNotAllowNullEncodeAndDecode() {
+ JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
+
+ Foo foo1 = new Foo();
+ foo1.setField1("foo1");
+ foo1.setField2("bar1");
+ foo1.setField4(new Bar());
+ foo1.setFieldUnableNull("notNull");
+
+ Foo foo2 = new Foo();
+ foo2.setField1("foo2");
+ foo2.setField2("bar2");
+
+ byte[] bytes1 = jsonSchema.encode(foo1);
+ Foo object1 = jsonSchema.decode(bytes1);
+ Assert.assertTrue(bytes1.length > 0);
+ assertEquals(object1, foo1);
+
+ try {
+
+ jsonSchema.encode(foo2);
+
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof SchemaSerializationException);
+ }
+
+ }
+
+ @Test
+ public void testAllowNullNestedClasses() {
+ JSONSchema<NestedBar> jsonSchema = JSONSchema.of(SchemaDefinition.<NestedBar>builder().withPojo(NestedBar.class).build());
+ JSONSchema<NestedBarList> listJsonSchema = JSONSchema.of(SchemaDefinition.<NestedBarList>builder().withPojo(NestedBarList.class).build());
+
+ Bar bar = new Bar();
+ bar.setField1(true);
+
+ NestedBar nested = new NestedBar();
+ nested.setField1(true);
+ nested.setNested(bar);
+
+ byte[] bytes = jsonSchema.encode(nested);
+ Assert.assertTrue(bytes.length > 0);
+ Assert.assertEquals(jsonSchema.decode(bytes), nested);
+
+ List<Bar> list = Collections.singletonList(bar);
+ NestedBarList nestedList = new NestedBarList();
+ nestedList.setField1(true);
+ nestedList.setList(list);
+
+ bytes = listJsonSchema.encode(nestedList);
+ Assert.assertTrue(bytes.length > 0);
+
+ Assert.assertEquals(listJsonSchema.decode(bytes), nestedList);
+ }
+
+ @Test
+ public void testNotAllowNullNestedClasses() {
+ JSONSchema<NestedBar> jsonSchema = JSONSchema.of(SchemaDefinition.<NestedBar>builder().withPojo(NestedBar.class).withAlwaysAllowNull(false).build());
+ JSONSchema<NestedBarList> listJsonSchema = JSONSchema.of(SchemaDefinition.<NestedBarList>builder().withPojo(NestedBarList.class).withAlwaysAllowNull(false).build());
Bar bar = new Bar();
bar.setField1(true);
@@ -117,7 +200,59 @@ public class JSONSchemaTest {
}
@Test
- public void testCorrectPolymorphism() {
+ public void testNotAllowNullCorrectPolymorphism() {
+ Bar bar = new Bar();
+ bar.setField1(true);
+
+ DerivedFoo derivedFoo = new DerivedFoo();
+ derivedFoo.setField1("foo1");
+ derivedFoo.setField2("bar2");
+ derivedFoo.setField3(4);
+ derivedFoo.setField4(bar);
+ derivedFoo.setField5("derived1");
+ derivedFoo.setField6(2);
+
+ Foo foo = new Foo();
+ foo.setField1("foo1");
+ foo.setField2("bar2");
+ foo.setField3(4);
+ foo.setField4(bar);
+
+ SchemaTestUtils.DerivedDerivedFoo derivedDerivedFoo = new SchemaTestUtils.DerivedDerivedFoo();
+ derivedDerivedFoo.setField1("foo1");
+ derivedDerivedFoo.setField2("bar2");
+ derivedDerivedFoo.setField3(4);
+ derivedDerivedFoo.setField4(bar);
+ derivedDerivedFoo.setField5("derived1");
+ derivedDerivedFoo.setField6(2);
+ derivedDerivedFoo.setFoo2(foo);
+ derivedDerivedFoo.setDerivedFoo(derivedFoo);
+
+ // schema for base class
+ JSONSchema<Foo> baseJsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(foo)), foo);
+ Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(derivedFoo)), foo);
+ Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(derivedDerivedFoo)), foo);
+
+ // schema for derived class
+ JSONSchema<DerivedFoo> derivedJsonSchema = JSONSchema.of(SchemaDefinition.<DerivedFoo>builder().withPojo(DerivedFoo.class).build());
+ Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedFoo)), derivedFoo);
+ Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedDerivedFoo)), derivedFoo);
+
+ //schema for derived derived class
+ JSONSchema<SchemaTestUtils.DerivedDerivedFoo> derivedDerivedJsonSchema
+ = JSONSchema.of(SchemaDefinition.<SchemaTestUtils.DerivedDerivedFoo>builder().withPojo(SchemaTestUtils.DerivedDerivedFoo.class).build());
+ Assert.assertEquals(derivedDerivedJsonSchema.decode(derivedDerivedJsonSchema.encode(derivedDerivedFoo)), derivedDerivedFoo);
+ }
+
+ @Test(expectedExceptions = SchemaSerializationException.class)
+ public void testAllowNullDecodeWithInvalidContent() {
+ JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ jsonSchema.decode(new byte[0]);
+ }
+
+ @Test
+ public void testAllowNullCorrectPolymorphism() {
Bar bar = new Bar();
bar.setField1(true);
@@ -146,25 +281,25 @@ public class JSONSchemaTest {
derivedDerivedFoo.setDerivedFoo(derivedFoo);
// schema for base class
- JSONSchema<Foo> baseJsonSchema = JSONSchema.of(Foo.class);
+ JSONSchema<Foo> baseJsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(foo)), foo);
Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(derivedFoo)), foo);
Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(derivedDerivedFoo)), foo);
// schema for derived class
- JSONSchema<DerivedFoo> derivedJsonSchema = JSONSchema.of(DerivedFoo.class);
+ JSONSchema<DerivedFoo> derivedJsonSchema = JSONSchema.of(SchemaDefinition.<DerivedFoo>builder().withPojo(DerivedFoo.class).withAlwaysAllowNull(false).build());
Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedFoo)), derivedFoo);
Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedDerivedFoo)), derivedFoo);
//schema for derived derived class
JSONSchema<SchemaTestUtils.DerivedDerivedFoo> derivedDerivedJsonSchema
- = JSONSchema.of(SchemaTestUtils.DerivedDerivedFoo.class);
+ = JSONSchema.of(SchemaDefinition.<SchemaTestUtils.DerivedDerivedFoo>builder().withPojo(SchemaTestUtils.DerivedDerivedFoo.class).withAlwaysAllowNull(false).build());
Assert.assertEquals(derivedDerivedJsonSchema.decode(derivedDerivedJsonSchema.encode(derivedDerivedFoo)), derivedDerivedFoo);
}
@Test(expectedExceptions = SchemaSerializationException.class)
- public void testDecodeWithInvalidContent() {
- JSONSchema<Foo> jsonSchema = JSONSchema.of(Foo.class);
+ public void testNotAllowNullDecodeWithInvalidContent() {
+ JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
jsonSchema.decode(new byte[0]);
}
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
index 2963d62..a32e31d 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
@@ -22,9 +22,7 @@ import static org.testng.Assert.assertEquals;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.JSONSchema;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Color;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
@@ -37,9 +35,9 @@ import org.testng.annotations.Test;
public class KeyValueSchemaTest {
@Test
- public void testAvroSchemaCreate() {
- AvroSchema<Foo> fooSchema = AvroSchema.of(Foo.class);
- AvroSchema<Bar> barSchema = AvroSchema.of(Bar.class);
+ public void testAllowNullAvroSchemaCreate() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
Schema<KeyValue<Foo, Bar>> keyValueSchema2 = Schema.KeyValue(Foo.class, Bar.class, SchemaType.AVRO);
@@ -47,13 +45,39 @@ public class KeyValueSchemaTest {
assertEquals(keyValueSchema1.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
- assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
+ SchemaType.AVRO);
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
+ SchemaType.AVRO);
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
+ SchemaType.AVRO);
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
+ SchemaType.AVRO);
+
+ String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
+ String schemaInfo2 = new String(keyValueSchema2.getSchemaInfo().getSchema());
+ assertEquals(schemaInfo1, schemaInfo2);
+ }
+
+ @Test
+ public void testNotAllowNullAvroSchemaCreate() {
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build());
+
+ Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
+ Schema<KeyValue<Foo, Bar>> keyValueSchema2 = Schema.KeyValue(AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build()),
+ AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build()));
+
+ assertEquals(keyValueSchema1.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
+ assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
+
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
SchemaType.AVRO);
- assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
SchemaType.AVRO);
- assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
SchemaType.AVRO);
- assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
SchemaType.AVRO);
String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
@@ -62,9 +86,9 @@ public class KeyValueSchemaTest {
}
@Test
- public void testJsonSchemaCreate() {
- JSONSchema<Foo> fooSchema = JSONSchema.of(Foo.class);
- JSONSchema<Bar> barSchema = JSONSchema.of(Bar.class);
+ public void testAllowNullJsonSchemaCreate() {
+ JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
Schema<KeyValue<Foo, Bar>> keyValueSchema2 = Schema.KeyValue(Foo.class, Bar.class, SchemaType.JSON);
@@ -74,17 +98,53 @@ public class KeyValueSchemaTest {
assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
assertEquals(keyValueSchema3.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
- assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
+ SchemaType.JSON);
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
+ SchemaType.JSON);
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
+ SchemaType.JSON);
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
+ SchemaType.JSON);
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema3).getKeySchema().getSchemaInfo().getType(),
+ SchemaType.JSON);
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema3).getValueSchema().getSchemaInfo().getType(),
+ SchemaType.JSON);
+
+ String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
+ String schemaInfo2 = new String(keyValueSchema2.getSchemaInfo().getSchema());
+ String schemaInfo3 = new String(keyValueSchema3.getSchemaInfo().getSchema());
+ assertEquals(schemaInfo1, schemaInfo2);
+ assertEquals(schemaInfo1, schemaInfo3);
+ }
+
+ @Test
+ public void testNotAllowNullJsonSchemaCreate() {
+ JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
+ JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build());
+
+ Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
+ Schema<KeyValue<Foo, Bar>> keyValueSchema2 = Schema.KeyValue(JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build()),
+ JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build()));
+
+ Schema<KeyValue<Foo, Bar>> keyValueSchema3 = Schema.KeyValue(JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build()),
+ JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build()));
+
+ assertEquals(keyValueSchema1.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
+ assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
+ assertEquals(keyValueSchema3.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
+
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
SchemaType.JSON);
- assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
SchemaType.JSON);
- assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
SchemaType.JSON);
- assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
SchemaType.JSON);
- assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema3).getKeySchema().getSchemaInfo().getType(),
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema3).getKeySchema().getSchemaInfo().getType(),
SchemaType.JSON);
- assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema3).getValueSchema().getSchemaInfo().getType(),
+ assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema3).getValueSchema().getSchemaInfo().getType(),
SchemaType.JSON);
String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
@@ -95,7 +155,7 @@ public class KeyValueSchemaTest {
}
@Test
- public void testSchemaEncodeAndDecode() {
+ public void testAllowNullSchemaEncodeAndDecode() {
Schema keyValueSchema = Schema.KeyValue(Foo.class, Bar.class);
Bar bar = new Bar();
@@ -111,7 +171,7 @@ public class KeyValueSchemaTest {
byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
Assert.assertTrue(encodeBytes.length > 0);
- KeyValue<Foo, Bar> keyValue = (KeyValue<Foo, Bar>)keyValueSchema.decode(encodeBytes);
+ KeyValue<Foo, Bar> keyValue = (KeyValue<Foo, Bar>) keyValueSchema.decode(encodeBytes);
Foo fooBack = keyValue.getKey();
Bar barBack = keyValue.getValue();
@@ -120,9 +180,64 @@ public class KeyValueSchemaTest {
}
@Test
- public void testBytesSchemaEncodeAndDecode() {
- AvroSchema<Foo> fooAvroSchema = AvroSchema.of(Foo.class);
- AvroSchema<Bar> barAvroSchema = AvroSchema.of(Bar.class);
+ public void testNotAllowNullSchemaEncodeAndDecode() {
+ Schema keyValueSchema = Schema.KeyValue(JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build()),
+ JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build()));
+
+ Bar bar = new Bar();
+ bar.setField1(true);
+
+ Foo foo = new Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+ foo.setField4(bar);
+ foo.setColor(Color.RED);
+
+ byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+ Assert.assertTrue(encodeBytes.length > 0);
+
+ KeyValue<Foo, Bar> keyValue = (KeyValue<Foo, Bar>) keyValueSchema.decode(encodeBytes);
+ Foo fooBack = keyValue.getKey();
+ Bar barBack = keyValue.getValue();
+
+ assertEquals(foo, fooBack);
+ assertEquals(bar, barBack);
+ }
+
+ @Test
+ public void testAllowNullBytesSchemaEncodeAndDecode() {
+ AvroSchema<Foo> fooAvroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ AvroSchema<Bar> barAvroSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+
+ Bar bar = new Bar();
+ bar.setField1(true);
+
+ Foo foo = new Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+ foo.setField4(bar);
+ foo.setColor(Color.RED);
+ foo.setFieldUnableNull("notNull");
+
+ byte[] fooBytes = fooAvroSchema.encode(foo);
+ byte[] barBytes = barAvroSchema.encode(bar);
+
+ byte[] encodeBytes = Schema.KV_BYTES().encode(new KeyValue<>(fooBytes, barBytes));
+ KeyValue<byte[], byte[]> decodeKV = Schema.KV_BYTES().decode(encodeBytes);
+
+ Foo fooBack = fooAvroSchema.decode(decodeKV.getKey());
+ Bar barBack = barAvroSchema.decode(decodeKV.getValue());
+
+ assertEquals(foo, fooBack);
+ assertEquals(bar, barBack);
+ }
+
+ @Test
+ public void testNotAllowNullBytesSchemaEncodeAndDecode() {
+ AvroSchema<Foo> fooAvroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
+ AvroSchema<Bar> barAvroSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build());
Bar bar = new Bar();
bar.setField1(true);
@@ -133,6 +248,7 @@ public class KeyValueSchemaTest {
foo.setField3(3);
foo.setField4(bar);
foo.setColor(Color.RED);
+ foo.setFieldUnableNull("notNull");
byte[] fooBytes = fooAvroSchema.encode(foo);
byte[] barBytes = barAvroSchema.encode(bar);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
index 4bc0c04..456416f 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl.schema;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
+import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
@@ -36,11 +37,17 @@ import org.testng.annotations.Test;
public class SchemaBuilderTest {
private static class AllOptionalFields {
+ @Nullable
private Integer intField;
+ @Nullable
private Long longField;
+ @Nullable
private String stringField;
+ @Nullable
private Boolean boolField;
+ @Nullable
private Float floatField;
+ @Nullable
private Double doubleField;
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java
index 0081f0e..98d5367 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java
@@ -23,6 +23,8 @@ import java.util.List;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
+import org.apache.avro.reflect.AvroDefault;
+import org.apache.avro.reflect.Nullable;
/**
* Utils for testing avro.
@@ -33,11 +35,17 @@ public class SchemaTestUtils {
@ToString
@EqualsAndHashCode
public static class Foo {
+ @Nullable
private String field1;
+ @Nullable
private String field2;
private int field3;
+ @Nullable
private Bar field4;
+ @Nullable
private Color color;
+ @AvroDefault("\"defaultValue\"")
+ private String fieldUnableNull;
}
@Data
@@ -87,21 +95,35 @@ public class SchemaTestUtils {
private Foo foo2;
}
- public static final String SCHEMA_JSON
- = "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema" +
- ".SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],\"default\":null}," +
- "{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\"," +
- "\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Bar\"," +
- "\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\"," +
- "\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\",\"symbols\":[\"RED\",\"BLUE\"]}]," +
- "\"default\":null}]}";
+ public static final String SCHEMA_AVRO_NOT_ALLOW_NULL
+ = "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"]," +
+ "\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":" +
+ "\"record\",\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\"," +
+ "\"symbols\":[\"RED\",\"BLUE\"]}],\"default\":null},{\"name\":\"fieldUnableNull\",\"type\":\"string\",\"default\":\"defaultValue\"}]}";
+
+ public static final String SCHEMA_AVRO_ALLOW_NULL = "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\"," +
+ "\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"" +
+ "null\",{\"type\":\"record\",\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\"" +
+ ",\"symbols\":[\"RED\",\"BLUE\"]}],\"default\":null},{\"name\":\"fieldUnableNull\",\"type\":[\"null\",\"string\"],\"default\":\"defaultValue\"}]}";
+
+ public static final String SCHEMA_JSON_NOT_ALLOW_NULL
+ = "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\"" +
+ ":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Bar\",\"fields\":[{\"name\":\"" +
+ "field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\",\"symbols\":[\"RED\",\"BLUE\"]}],\"default\":null},{\"name\":\"fieldUnableNull\"," +
+ "\"type\":\"string\",\"default\":\"defaultValue\"}]}";
+ public static final String SCHEMA_JSON_ALLOW_NULL
+ = "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],\"default\":null}," +
+ "{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Bar\",\"fields\":" +
+ "[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\",\"symbols\":[\"RED\",\"BLUE\"]}],\"default\":null},{\"name\":" +
+ "\"fieldUnableNull\",\"type\":[\"null\",\"string\"],\"default\":\"defaultValue\"}]}";
public static String[] FOO_FIELDS = {
"field1",
"field2",
"field3",
"field4",
- "color"
+ "color",
+ "fieldUnableNull"
};
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
index fc65d70..ed554cc 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
@@ -77,7 +77,7 @@ public class GenericSchemaImplTest {
Bar bar = new Bar();
bar.setField1(i % 2 == 0);
foo.setField4(bar);
-
+ foo.setFieldUnableNull("fieldUnableNull-1-" + i);
byte[] data = encodeSchema.encode(foo);
log.info("Decoding : {}", new String(data, UTF_8));
@@ -93,6 +93,8 @@ public class GenericSchemaImplTest {
assertTrue(field4 instanceof GenericRecord);
GenericRecord field4Record = (GenericRecord) field4;
assertEquals(i % 2 == 0, field4Record.getField("field1"));
+ Object fieldUnableNull = record.getField("fieldUnableNull");
+ assertEquals("fieldUnableNull-1-" + i, fieldUnableNull, "fieldUnableNull 1 is " + fieldUnableNull.getClass());
}
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java
index 884a674..aedc0bd 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java
@@ -24,10 +24,12 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils;
+import org.apache.pulsar.client.tutorial.JsonPojo;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.testng.annotations.BeforeMethod;
@@ -54,7 +56,7 @@ public class MultiVersionGenericSchemaProviderTest {
@Test
public void testGetSchema() {
CompletableFuture<Optional<SchemaInfo>> completableFuture = new CompletableFuture<>();
- SchemaInfo schemaInfo = AvroSchema.of(SchemaTestUtils.Foo.class).getSchemaInfo();
+ SchemaInfo schemaInfo = AvroSchema.of(SchemaDefinition.<SchemaTestUtils>builder().withPojo(SchemaTestUtils.class).build()).getSchemaInfo();
completableFuture.complete(Optional.of(schemaInfo));
when(schemaProvider.getPulsarClient().getLookup()
.getSchema(
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducerWithSchema.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducerWithSchema.java
index 90758b6..c838244 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducerWithSchema.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducerWithSchema.java
@@ -27,15 +27,15 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
-
@Slf4j
public class SampleAsyncProducerWithSchema {
public static void main(String[] args) throws IOException {
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("http://localhost:8080").build();
- Producer<JsonPojo> producer = pulsarClient.newProducer(JSONSchema.of(JsonPojo.class)).topic("persistent://my-property/use/my-ns/my-topic")
+ Producer<JsonPojo> producer = pulsarClient.newProducer(JSONSchema.of(SchemaDefinition.<JsonPojo>builder().withPojo(JsonPojo.class).build())).topic("persistent://my-property/use/my-ns/my-topic")
.sendTimeout(3, TimeUnit.SECONDS).create();
List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumerWithSchema.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumerWithSchema.java
index 3780332..1708b84 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumerWithSchema.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumerWithSchema.java
@@ -23,6 +23,7 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
public class SampleConsumerWithSchema {
@@ -30,7 +31,8 @@ public class SampleConsumerWithSchema {
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("http://localhost:8080").build();
- Consumer<JsonPojo> consumer = pulsarClient.newConsumer(JSONSchema.of(JsonPojo.class)) //
+ Consumer<JsonPojo> consumer = pulsarClient.newConsumer(JSONSchema.of
+ (SchemaDefinition.<JsonPojo>builder().withPojo(JsonPojo.class).build())) //
.topic("persistent://my-property/use/my-ns/my-topic") //
.subscriptionName("my-subscription-name").subscribe();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index d8abcc6..2ac42f4 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
@@ -124,10 +125,10 @@ public class TopicSchema {
return (Schema<T>) Schema.STRING;
case AVRO:
- return AvroSchema.of(clazz);
+ return AvroSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build());
case JSON:
- return JSONSchema.of(clazz);
+ return JSONSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build());
case KEY_VALUE:
return (Schema<T>)Schema.KV_BYTES();
diff --git a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
index 0478775..984f658 100644
--- a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
+++ b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -108,7 +109,7 @@ public class HbaseGenericRecordSinkTest {
obj.setAddress("address_value");
obj.setAge(30);
obj.setFlag(true);
- AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+ AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
byte[] bytes = schema.encode(obj);
ByteBuf payload = Unpooled.copiedBuffer(bytes);
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index 7cbaa57..9ecc91a 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -29,6 +29,7 @@ import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -95,7 +96,7 @@ public class JdbcSinkTest {
obj.setField1("ValueOfField1");
obj.setField2("ValueOfField1");
obj.setField3(3);
- AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+ AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
byte[] bytes = schema.encode(obj);
ByteBuf payload = Unpooled.copiedBuffer(bytes);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index b6eade2..7a62f67 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.admin.Schemas;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.api.Commands;
@@ -219,21 +220,21 @@ public abstract class TestPulsarConnector {
partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_6.toString(), 7);
topicsToSchemas = new HashMap<>();
- topicsToSchemas.put(TOPIC_1.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(TOPIC_2.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(TOPIC_3.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(TOPIC_4.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(TOPIC_5.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(TOPIC_6.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(TOPIC_1.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+ topicsToSchemas.put(TOPIC_2.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+ topicsToSchemas.put(TOPIC_3.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+ topicsToSchemas.put(TOPIC_4.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+ topicsToSchemas.put(TOPIC_5.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+ topicsToSchemas.put(TOPIC_6.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
- topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
- topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+ topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+ topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+ topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+ topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
fooTypes = new HashMap<>();
fooTypes.put("field1", IntegerType.INTEGER);
@@ -622,7 +623,7 @@ public abstract class TestPulsarConnector {
.setProducerName("test-producer").setSequenceId(i)
.setPublishTime(currentTimeMs + i).build();
- Schema schema = topicsToSchemas.get(topicSchemaName).getType() == SchemaType.AVRO ? AvroSchema.of(Foo.class) : JSONSchema.of(Foo.class);
+ Schema schema = topicsToSchemas.get(topicSchemaName).getType() == SchemaType.AVRO ? AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build()) : JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
org.apache.pulsar.shade.io.netty.buffer.ByteBuf payload = org.apache.pulsar.shade.io.netty.buffer.Unpooled
.copiedBuffer(schema.encode(foo));
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
index 72d9b01..da4331a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
@@ -33,6 +33,7 @@ import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.containers.MySQLContainer;
@@ -60,7 +61,7 @@ public class JdbcSinkTester extends SinkTester<MySQLContainer> {
private static final String NAME = "jdbc";
private static final String MYSQL = "mysql";
- private AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+ private AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
private String tableName = "test";
private Connection connection;