You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dk...@apache.org on 2016/06/22 19:38:01 UTC

camel git commit: [CAMEL-10069] Update to use ClassResolver to help search for the partitioner and serializers Also pull search for Partitioner out into separate try block to allow for use with 0.8 kafka client (which doesn't have partitioner)

Repository: camel
Updated Branches:
  refs/heads/master b8f5da747 -> ccef28fbc


[CAMEL-10069] Update to use ClassResolver to help search for the partitioner and serializers
Also pull search for Partitioner out into separate try block to allow for use with 0.8 kafka client (which doesn't have partitioner)


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ccef28fb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ccef28fb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ccef28fb

Branch: refs/heads/master
Commit: ccef28fbc003c541eb9ebd3b584092d0ea6c0f5d
Parents: b8f5da7
Author: Daniel Kulp <dk...@apache.org>
Authored: Wed Jun 22 15:30:14 2016 -0400
Committer: Daniel Kulp <dk...@apache.org>
Committed: Wed Jun 22 15:30:14 2016 -0400

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    |  2 -
 .../camel/component/kafka/KafkaProducer.java    | 73 ++++++++++++--------
 .../component/kafka/KafkaProducerFullTest.java  |  2 +-
 3 files changed, 47 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ccef28fb/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index a317b54..8b995ff 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.kafka;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -30,7 +29,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/camel/blob/ccef28fb/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index ae2f2a4..a81ec15 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.lang.reflect.Field;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -30,15 +31,20 @@ import org.apache.camel.CamelException;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.spi.ClassResolver;
+import org.apache.camel.util.CastUtils;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Partitioner;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class KafkaProducer extends DefaultAsyncProducer {
-
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer.class);
+    
     private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
     private final KafkaEndpoint endpoint;
     private ExecutorService workerPool;
@@ -49,36 +55,22 @@ public class KafkaProducer extends DefaultAsyncProducer {
         this.endpoint = endpoint;
     }
 
-    
-    Class<?> loadClass(Object o, ClassLoader loader) {
+    <T> Class<T> loadClass(Object o, ClassResolver resolver, Class<T> type) {
         if (o == null || o instanceof Class) {
-            return (Class<?>)o;
+            return CastUtils.cast((Class<?>)o);
         }
         String name = o.toString();
-        Class<?> c;
-        try {
-            c = Class.forName(name, true, loader);
-        } catch (ClassNotFoundException e) {
-            c = null;
-        }
+        Class<T> c = resolver.resolveClass(name, type);
         if (c == null) {
-            try {
-                c = Class.forName(name, true, getClass().getClassLoader());
-            } catch (ClassNotFoundException e) {
-                c = null;
-            }
+            c = resolver.resolveClass(name, type, getClass().getClassLoader());
         }
         if (c == null) {
-            try {
-                c = Class.forName(name, true, org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
-            } catch (ClassNotFoundException e) {
-                c = null;
-            }
+            c = resolver.resolveClass(name, type, org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
         }
         return c;
     }
-    void replaceWithClass(Properties props, String key,  ClassLoader loader, Class<?> type) {
-        Class<?> c = loadClass(props.get(key), loader);
+    void replaceWithClass(Properties props, String key,  ClassResolver resolver, Class<?> type) {
+        Class<?> c = loadClass(props.get(key), resolver, type);
         if (c != null) {
             props.put(key, c);
         }
@@ -86,11 +78,28 @@ public class KafkaProducer extends DefaultAsyncProducer {
     
     Properties getProps() {
         Properties props = endpoint.getConfiguration().createProducerProperties();
-        if (endpoint.getCamelContext() != null) {
-            ClassLoader loader = endpoint.getCamelContext().getApplicationContextClassLoader();
-            replaceWithClass(props, "key.serializer", loader, Serializer.class);
-            replaceWithClass(props, "value.serializer", loader, Serializer.class);
-            replaceWithClass(props, "partitioner.class", loader, Partitioner.class);
+        try {
+            if (endpoint.getCamelContext() != null) {
+                ClassResolver resolver = endpoint.getCamelContext().getClassResolver();
+                replaceWithClass(props, "key.serializer", resolver, Serializer.class);
+                replaceWithClass(props, "value.serializer", resolver, Serializer.class);
+                
+                try {
+                    //doesn't exist in old version of Kafka client so detect and only call the method if
+                    //the field/config actually exists
+                    Field f = ProducerConfig.class.getDeclaredField("PARTITIONER_CLASS_CONFIG");
+                    if (f != null) {
+                        loadParitionerClass(resolver, props);
+                    }
+                } catch (NoSuchFieldException e) {
+                    //ignore
+                } catch (SecurityException e) {
+                    //ignore
+                }
+            }
+        } catch (Throwable t) {
+            //can ignore and Kafka itself might be able to handle it, if not, it will throw an exception
+            LOG.debug("Problem loading classes for Serializers", t);
         }
         if (endpoint.getBrokers() != null) {
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getBrokers());
@@ -98,6 +107,11 @@ public class KafkaProducer extends DefaultAsyncProducer {
         return props;
     }
 
+    private void loadParitionerClass(ClassResolver resolver, Properties props) {
+        replaceWithClass(props, "partitioner.class", resolver, Partitioner.class);
+    }
+
+
     public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() {
         return kafkaProducer;
     }
@@ -184,6 +198,11 @@ public class KafkaProducer extends DefaultAsyncProducer {
                     }
                     return new ProducerRecord(msgTopic, msgList.next());
                 }
+
+                @Override
+                public void remove() {
+                    msgList.remove();
+                }
             };
         }
         ProducerRecord record;

http://git-wip-us.apache.org/repos/asf/camel/blob/ccef28fb/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index d5b65fa..30f2b13 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -99,7 +99,7 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
     }
 
     @Override
-    protected RoutesBuilder createRouteBuilder() throws Exception {
+    protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {