You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/24 08:19:24 UTC

[camel-kafka-connector] 02/18: fix #692 : Cache already computed Schemas in PojoToSchemaAndStructTransform.

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 09491aa383912569ddd5f9d75d903b3e1b536132
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Mon Nov 16 23:38:23 2020 +0100

    fix #692 : Cache already computed Schemas in PojoToSchemaAndStructTransform.
---
 .../transforms/PojoToSchemaAndStructTransform.java | 90 ++++++++++++++++------
 .../PojoToSchemaAndStructTransformTest.java        | 41 +++++++++-
 2 files changed, 104 insertions(+), 27 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java
index d458636..1ecf48f 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransform.java
@@ -18,9 +18,13 @@ package org.apache.camel.kafkaconnector.transforms;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
 
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
 import com.fasterxml.jackson.dataformat.avro.AvroFactory;
 import com.fasterxml.jackson.dataformat.avro.AvroSchema;
 import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaGenerator;
@@ -44,39 +48,54 @@ public class PojoToSchemaAndStructTransform <R extends ConnectRecord<R>> impleme
     private static final ObjectMapper MAPPER = new ObjectMapper(new AvroFactory());
 
     private AvroData avroData;
+    private ConcurrentMap<String, CacheEntry> avroSchemaWrapperCache;
 
     @Override
     public R apply(R r) {
-        LOG.debug("Incoming record: " + r);
+        LOG.debug("Incoming record: {}", r);
 
-        AvroSchemaGenerator gen = new AvroSchemaGenerator();
+        if (r.value() != null) {
+            String recordClassCanonicalName = r.value().getClass().getCanonicalName();
+            CacheEntry cacheEntry = avroSchemaWrapperCache.computeIfAbsent(recordClassCanonicalName, new Function<String, CacheEntry>() {
+                @Override
+                public CacheEntry apply(String s) {
+                    //cache miss
+                    AvroSchemaGenerator gen = new AvroSchemaGenerator();
 
-        try {
-            MAPPER.acceptJsonFormatVisitor(r.value().getClass(), gen);
-        } catch (JsonMappingException e) {
-            throw new ConnectException("Error in generating POJO schema.", e);
-        }
+                    try {
+                        MAPPER.acceptJsonFormatVisitor(r.value().getClass(), gen);
+                    } catch (JsonMappingException e) {
+                        throw new ConnectException("Error in generating POJO schema.", e);
+                    }
 
-        AvroSchema schemaWrapper = gen.getGeneratedSchema();
-        org.apache.avro.Schema avroSchema = schemaWrapper.getAvroSchema();
-        LOG.debug("Generated avro schema: " + avroSchema.toString(true));
+                    AvroSchema schemaWrapper = gen.getGeneratedSchema();
+                    LOG.debug("Generated and cached avro schema: {}", schemaWrapper.getAvroSchema().toString(true));
 
-        SchemaAndValue connectSchemaAndData = null;
-        try {
-            byte[] avroDataByte = MAPPER.writer(schemaWrapper).writeValueAsBytes(r.value());
-            Decoder decoder = DecoderFactory.get().binaryDecoder(avroDataByte, null);
-            DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
-            GenericRecord genericAvroData = datumReader.read(null, decoder);
+                    return new CacheEntry(schemaWrapper, MAPPER.writer(schemaWrapper));
+                }
+            });
 
-            connectSchemaAndData = this.avroData.toConnectData(avroSchema, genericAvroData);
-        } catch (IOException e) {
-            throw new ConnectException("Error in generating POJO Struct.", e);
-        }
+            SchemaAndValue connectSchemaAndData = null;
+            try {
+                byte[] avroDataByte = cacheEntry.getObjectWriter().writeValueAsBytes(r.value());
+                Decoder decoder = DecoderFactory.get().binaryDecoder(avroDataByte, null);
+                org.apache.avro.Schema avroSchema = cacheEntry.getAvroSchemaWrapper().getAvroSchema();
+                DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
+                GenericRecord genericAvroData = datumReader.read(null, decoder);
 
-        LOG.debug("Generate kafka connect schema: " + connectSchemaAndData.schema());
-        LOG.debug("Generate kafka connect value (as Struct): " + connectSchemaAndData.value());
-        return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(),
-                connectSchemaAndData.schema(), connectSchemaAndData.value(), r.timestamp());
+                connectSchemaAndData = this.avroData.toConnectData(avroSchema, genericAvroData);
+            } catch (IOException e) {
+                throw new ConnectException("Error in generating POJO Struct.", e);
+            }
+
+            LOG.debug("Generate kafka connect schema: {}", connectSchemaAndData.schema());
+            LOG.debug("Generate kafka connect value (as Struct): {}", connectSchemaAndData.value());
+            return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(),
+                    connectSchemaAndData.schema(), connectSchemaAndData.value(), r.timestamp());
+        } else {
+            LOG.debug("Incoming record with a null value, nothing to be done.");
+            return r;
+        }
     }
 
     @Override
@@ -91,6 +110,29 @@ public class PojoToSchemaAndStructTransform <R extends ConnectRecord<R>> impleme
 
     @Override
     public void configure(Map<String, ?> configs) {
+        this.avroSchemaWrapperCache = new ConcurrentHashMap<>();
         this.avroData = new AvroData(new AvroDataConfig(configs));
     }
+
+    public Map<String, CacheEntry> getCache() {
+        return this.avroSchemaWrapperCache;
+    }
+
+    public class CacheEntry {
+        private AvroSchema avroSchemaWrapper;
+        private ObjectWriter objectWriter;
+
+        public CacheEntry(AvroSchema avroSchemaWrapper, ObjectWriter objectWriter) {
+            this.avroSchemaWrapper = avroSchemaWrapper;
+            this.objectWriter = objectWriter;
+        }
+
+        public AvroSchema getAvroSchemaWrapper() {
+            return avroSchemaWrapper;
+        }
+
+        public ObjectWriter getObjectWriter() {
+            return objectWriter;
+        }
+    }
 }
\ No newline at end of file
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java
index 9bc617c..e378eab 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/transforms/PojoToSchemaAndStructTransformTest.java
@@ -122,14 +122,49 @@ public class PojoToSchemaAndStructTransformTest {
                 Schema.STRING_SCHEMA, "testKeyValue",
                 Schema.BYTES_SCHEMA, map);
 
-        assertThrows(ConnectException.class, () -> {pojoToSchemaAndStructTransform.apply(cr);});
+        assertThrows(ConnectException.class, () -> {
+            pojoToSchemaAndStructTransform.apply(cr);
+        });
+    }
+
+    @Test()
+    public void testNullValueConversion() {
+        PojoToSchemaAndStructTransform pojoToSchemaAndStructTransform = new PojoToSchemaAndStructTransform();
+        pojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+
+        ConnectRecord cr = new SourceRecord(null, null, "testTopic",
+                Schema.STRING_SCHEMA, "testKeyValue",
+                Schema.BYTES_SCHEMA, null);
+
+        ConnectRecord transformedCr = pojoToSchemaAndStructTransform.apply(cr);
+        assertEquals(cr, transformedCr);
+    }
+
+    @Test()
+    public void testConversionCache() {
+        PojoToSchemaAndStructTransform pojoToSchemaAndStructTransform = new PojoToSchemaAndStructTransform();
+        pojoToSchemaAndStructTransform.configure(Collections.emptyMap());
+
+        PojoWithMap pwm = new PojoWithMap();
+        pwm.addToMap("ciao", 9);
+
+        ConnectRecord cr = new SourceRecord(null, null, "testTopic",
+                Schema.STRING_SCHEMA, "testKeyValue",
+                Schema.BYTES_SCHEMA, pwm);
+
+        assertEquals(0, pojoToSchemaAndStructTransform.getCache().keySet().size());
+        pojoToSchemaAndStructTransform.apply(cr);
+        assertEquals(1, pojoToSchemaAndStructTransform.getCache().keySet().size());
+        ConnectRecord transformedCr = pojoToSchemaAndStructTransform.apply(cr);
+        assertEquals(1, pojoToSchemaAndStructTransform.getCache().keySet().size());
+        assertTrue(pojoToSchemaAndStructTransform.getCache().keySet().contains(PojoWithMap.class.getCanonicalName()));
     }
 
     private void atLeastOneFieldWithGivenValueExists(List structs, String fieldName, String fieldExpectedValue) {
         structs.stream().filter(
-                struct -> ((Struct) struct).getString(fieldName) == null ? false : true
+            struct -> ((Struct) struct).getString(fieldName) == null ? false : true
         ).forEach(
-                struct -> assertEquals(fieldExpectedValue, ((Struct) struct).getString(fieldName))
+            struct -> assertEquals(fieldExpectedValue, ((Struct) struct).getString(fieldName))
         );
     }