You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/24 06:48:21 UTC
[camel-kafka-connector] 02/17: fix #692 : Cache already computed
Schemas in PojoToSchemaAndStructTransform.
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-master-align-and-rebase
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit e8ba4a372101b80ab7cb3c9218aef4f9bf57d9b8
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Mon Nov 16 23:38:23 2020 +0100
fix #692 : Cache already computed Schemas in PojoToSchemaAndStructTransform.
---
.../transforms/PojoToSchemaAndStructTransform.java | 90 ++++++++++++++++------
.../PojoToSchemaAndStructTransformTest.java | 41 +++++++++-
2 files changed, 104 insertions(+), 27 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java
index d458636..1ecf48f 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java
@@ -18,9 +18,13 @@ package org.apache.camel.kafkaconnector.transforms;
import java.io.IOException;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.dataformat.avro.AvroFactory;
import com.fasterxml.jackson.dataformat.avro.AvroSchema;
import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaGenerator;
@@ -44,39 +48,54 @@ public class PojoToSchemaAndStructTransform <R extends ConnectRecord<R>> impleme
private static final ObjectMapper MAPPER = new ObjectMapper(new AvroFactory());
private AvroData avroData;
+ private ConcurrentMap<String, CacheEntry> avroSchemaWrapperCache;
@Override
public R apply(R r) {
- LOG.debug("Incoming record: " + r);
+ LOG.debug("Incoming record: {}", r);
- AvroSchemaGenerator gen = new AvroSchemaGenerator();
+ if (r.value() != null) {
+ String recordClassCanonicalName = r.value().getClass().getCanonicalName();
+ CacheEntry cacheEntry = avroSchemaWrapperCache.computeIfAbsent(recordClassCanonicalName, new Function<String, CacheEntry>() {
+ @Override
+ public CacheEntry apply(String s) {
+ //cache miss
+ AvroSchemaGenerator gen = new AvroSchemaGenerator();
- try {
- MAPPER.acceptJsonFormatVisitor(r.value().getClass(), gen);
- } catch (JsonMappingException e) {
- throw new ConnectException("Error in generating POJO schema.", e);
- }
+ try {
+ MAPPER.acceptJsonFormatVisitor(r.value().getClass(), gen);
+ } catch (JsonMappingException e) {
+ throw new ConnectException("Error in generating POJO schema.", e);
+ }
- AvroSchema schemaWrapper = gen.getGeneratedSchema();
- org.apache.avro.Schema avroSchema = schemaWrapper.getAvroSchema();
- LOG.debug("Generated avro schema: " + avroSchema.toString(true));
+ AvroSchema schemaWrapper = gen.getGeneratedSchema();
+ LOG.debug("Generated and cached avro schema: {}", schemaWrapper.getAvroSchema().toString(true));
- SchemaAndValue connectSchemaAndData = null;
- try {
- byte[] avroDataByte = MAPPER.writer(schemaWrapper).writeValueAsBytes(r.value());
- Decoder decoder = DecoderFactory.get().binaryDecoder(avroDataByte, null);
- DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
- GenericRecord genericAvroData = datumReader.read(null, decoder);
+ return new CacheEntry(schemaWrapper, MAPPER.writer(schemaWrapper));
+ }
+ });
- connectSchemaAndData = this.avroData.toConnectData(avroSchema, genericAvroData);
- } catch (IOException e) {
- throw new ConnectException("Error in generating POJO Struct.", e);
- }
+ SchemaAndValue connectSchemaAndData = null;
+ try {
+ byte[] avroDataByte = cacheEntry.getObjectWriter().writeValueAsBytes(r.value());
+ Decoder decoder = DecoderFactory.get().binaryDecoder(avroDataByte, null);
+ org.apache.avro.Schema avroSchema = cacheEntry.getAvroSchemaWrapper().getAvroSchema();
+ DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
+ GenericRecord genericAvroData = datumReader.read(null, decoder);
- LOG.debug("Generate kafka connect schema: " + connectSchemaAndData.schema());
- LOG.debug("Generate kafka connect value (as Struct): " + connectSchemaAndData.value());
- return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(),
- connectSchemaAndData.schema(), connectSchemaAndData.value(), r.timestamp());
+ connectSchemaAndData = this.avroData.toConnectData(avroSchema, genericAvroData);
+ } catch (IOException e) {
+ throw new ConnectException("Error in generating POJO Struct.", e);
+ }
+
+ LOG.debug("Generate kafka connect schema: {}", connectSchemaAndData.schema());
+ LOG.debug("Generate kafka connect value (as Struct): {}", connectSchemaAndData.value());
+ return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(),
+ connectSchemaAndData.schema(), connectSchemaAndData.value(), r.timestamp());
+ } else {
+ LOG.debug("Incoming record with a null value, nothing to be done.");
+ return r;
+ }
}
@Override
@@ -91,6 +110,29 @@ public class PojoToSchemaAndStructTransform <R extends ConnectRecord<R>> impleme
@Override
public void configure(Map<String, ?> configs) {
+ this.avroSchemaWrapperCache = new ConcurrentHashMap<>();
this.avroData = new AvroData(new AvroDataConfig(configs));
}
+
+ public Map<String, CacheEntry> getCache() {
+ return this.avroSchemaWrapperCache;
+ }
+
+ public class CacheEntry {
+ private AvroSchema avroSchemaWrapper;
+ private ObjectWriter objectWriter;
+
+ public CacheEntry(AvroSchema avroSchemaWrapper, ObjectWriter objectWriter) {
+ this.avroSchemaWrapper = avroSchemaWrapper;
+ this.objectWriter = objectWriter;
+ }
+
+ public AvroSchema getAvroSchemaWrapper() {
+ return avroSchemaWrapper;
+ }
+
+ public ObjectWriter getObjectWriter() {
+ return objectWriter;
+ }
+ }
}
\ No newline at end of file
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java
index 9bc617c..e378eab 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java
@@ -122,14 +122,49 @@ public class PojoToSchemaAndStructTransformTest {
Schema.STRING_SCHEMA, "testKeyValue",
Schema.BYTES_SCHEMA, map);
- assertThrows(ConnectException.class, () -> {pojoToSchemaAndStructTransform.apply(cr);});
+ assertThrows(ConnectException.class, () -> {
+ pojoToSchemaAndStructTransform.apply(cr);
+ });
+ }
+
+ @Test()
+ public void testNullValueConversion() {
+ PojoToSchemaAndStructTransform pojoToSchemaAndStructTransform = new PojoToSchemaAndStructTransform();
+ pojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+
+ ConnectRecord cr = new SourceRecord(null, null, "testTopic",
+ Schema.STRING_SCHEMA, "testKeyValue",
+ Schema.BYTES_SCHEMA, null);
+
+ ConnectRecord transformedCr = pojoToSchemaAndStructTransform.apply(cr);
+ assertEquals(cr, transformedCr);
+ }
+
+ @Test()
+ public void testConversionCache() {
+ PojoToSchemaAndStructTransform pojoToSchemaAndStructTransform = new PojoToSchemaAndStructTransform();
+ pojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+
+ PojoWithMap pwm = new PojoWithMap();
+ pwm.addToMap("ciao", 9);
+
+ ConnectRecord cr = new SourceRecord(null, null, "testTopic",
+ Schema.STRING_SCHEMA, "testKeyValue",
+ Schema.BYTES_SCHEMA, pwm);
+
+ assertEquals(0, pojoToSchemaAndStructTransform.getCache().keySet().size());
+ pojoToSchemaAndStructTransform.apply(cr);
+ assertEquals(1, pojoToSchemaAndStructTransform.getCache().keySet().size());
+ ConnectRecord transformedCr = pojoToSchemaAndStructTransform.apply(cr);
+ assertEquals(1, pojoToSchemaAndStructTransform.getCache().keySet().size());
+ assertTrue(pojoToSchemaAndStructTransform.getCache().keySet().contains(PojoWithMap.class.getCanonicalName()));
}
private void atLeastOneFieldWithGivenValueExists(List structs, String fieldName, String fieldExpectedValue) {
structs.stream().filter(
- struct -> ((Struct) struct).getString(fieldName) == null ? false : true
+ struct -> ((Struct) struct).getString(fieldName) == null ? false : true
).forEach(
- struct -> assertEquals(fieldExpectedValue, ((Struct) struct).getString(fieldName))
+ struct -> assertEquals(fieldExpectedValue, ((Struct) struct).getString(fieldName))
);
}