You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2021/11/30 06:15:04 UTC

[GitHub] [camel] orpiske commented on a change in pull request #6491: CAMEL-17240: cleanup the camel-kafka producer code

orpiske commented on a change in pull request #6491:
URL: https://github.com/apache/camel/pull/6491#discussion_r758963800



##########
File path: components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
##########
@@ -143,167 +145,260 @@ protected void doStop() throws Exception {
         }
 
         if (shutdownWorkerPool && workerPool != null) {
-            int timeout = endpoint.getConfiguration().getShutdownTimeout();
+            int timeout = configuration.getShutdownTimeout();
             LOG.debug("Shutting down Kafka producer worker threads with timeout {} millis", timeout);
             endpoint.getCamelContext().getExecutorServiceManager().shutdownGraceful(workerPool, timeout);
             workerPool = null;
         }
     }
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    protected Iterator<KeyValueHolder<Object, ProducerRecord>> createRecorder(Exchange exchange) throws Exception {
-        String topic = endpoint.getConfiguration().getTopic();
-        Long timeStamp = null;
+    protected Iterator<KeyValueHolder<Object, ProducerRecord<Object, Object>>> createRecordIterable(
+            Exchange exchange, Message message) {
+        String topic = evaluateTopic(message);
 
-        // must remove header so its not propagated
-        Object overrideTopic = exchange.getIn().removeHeader(KafkaConstants.OVERRIDE_TOPIC);
-        if (overrideTopic != null) {
-            LOG.debug("Using override topic: {}", overrideTopic);
-            topic = overrideTopic.toString();
-        }
+        // extracting headers which need to be propagated
+        List<Header> propagatedHeaders = getPropagatedHeaders(exchange, message);
 
-        if (topic == null) {
-            // if topic property was not received from configuration or header
-            // parameters take it from the remaining URI
-            topic = URISupport.extractRemainderPath(new URI(endpoint.getEndpointUri()), true);
-        }
+        Object body = message.getBody();
+
+        Iterator<Object> iterator = getObjectIterator(body);
+
+        return new KeyValueHolderIterator(iterator, exchange, configuration, topic, propagatedHeaders);
+    }
+
+    protected ProducerRecord<Object, Object> createRecord(Exchange exchange, Message message) {
+        String topic = evaluateTopic(message);
 
-        Object overrideTimeStamp = exchange.getIn().removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
+        Long timeStamp = null;
+        Object overrideTimeStamp = message.removeHeader(KafkaConstants.OVERRIDE_TIMESTAMP);
         if (overrideTimeStamp instanceof Long) {
             LOG.debug("Using override TimeStamp: {}", overrideTimeStamp);
             timeStamp = (Long) overrideTimeStamp;
         }
 
         // extracting headers which need to be propagated
-        List<Header> propagatedHeaders = getPropagatedHeaders(exchange, endpoint.getConfiguration());
-
-        Object msg = exchange.getIn().getBody();
-
-        // is the message body a list or something that contains multiple values
-        Iterator<Object> iterator = null;
-        if (msg instanceof Iterable) {
-            iterator = ((Iterable<Object>) msg).iterator();
-        } else if (msg instanceof Iterator) {
-            iterator = (Iterator<Object>) msg;
-        }
-        if (iterator != null) {
-            final Iterator<Object> msgList = iterator;
-            final String msgTopic = topic;
-
-            return new KeyValueHolderIterator(msgList, exchange, endpoint.getConfiguration(), msgTopic, propagatedHeaders);
-        }
+        List<Header> propagatedHeaders = getPropagatedHeaders(exchange, message);
 
         // endpoint take precedence over header configuration
-        final Integer partitionKey = ObjectHelper.supplyIfEmpty(endpoint.getConfiguration().getPartitionKey(),
-                () -> exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class));
+        final Integer partitionKey = ObjectHelper.supplyIfEmpty(configuration.getPartitionKey(),
+                () -> message.getHeader(KafkaConstants.PARTITION_KEY, Integer.class));
 
         // endpoint take precedence over header configuration
-        Object key = ObjectHelper.supplyIfEmpty(endpoint.getConfiguration().getKey(),
-                () -> exchange.getIn().getHeader(KafkaConstants.KEY));
+        Object key = ObjectHelper.supplyIfEmpty(configuration.getKey(),
+                () -> message.getHeader(KafkaConstants.KEY));
 
         if (key != null) {
-            key = tryConvertToSerializedType(exchange, key, endpoint.getConfiguration().getKeySerializer());
+            key = tryConvertToSerializedType(exchange, key, configuration.getKeySerializer());
         }
 
         // must convert each entry of the iterator into the value according to
         // the serializer
-        Object value = tryConvertToSerializedType(exchange, msg, endpoint.getConfiguration().getValueSerializer());
+        Object value = tryConvertToSerializedType(exchange, message.getBody(), configuration.getValueSerializer());
+
+        return new ProducerRecord<>(topic, partitionKey, timeStamp, key, value, propagatedHeaders);
+    }
 
-        ProducerRecord record = new ProducerRecord(topic, partitionKey, timeStamp, key, value, propagatedHeaders);
-        return Collections.singletonList(new KeyValueHolder<Object, ProducerRecord>((Object) exchange, record)).iterator();
+    protected KeyValueHolder<Object, ProducerRecord<Object, Object>> createKeyValueHolder(Exchange exchange, Message message) {
+        ProducerRecord<Object, Object> record = createRecord(exchange, message);
+
+        return new KeyValueHolder<>(exchange, record);
     }
 
-    private List<Header> getPropagatedHeaders(Exchange exchange, KafkaConfiguration getConfiguration) {
-        HeaderFilterStrategy headerFilterStrategy = getConfiguration.getHeaderFilterStrategy();
-        KafkaHeaderSerializer headerSerializer = getConfiguration.getHeaderSerializer();
-        return exchange.getIn().getHeaders().entrySet().stream()
-                .filter(entry -> shouldBeFiltered(entry, exchange, headerFilterStrategy))
-                .map(entry -> getRecordHeader(entry, headerSerializer)).filter(Objects::nonNull).collect(Collectors.toList());
+    private String evaluateTopic(Message message) {
+        String topic = configuration.getTopic();
+
+        // must remove header so it's not propagated
+        Object overrideTopic = message.removeHeader(KafkaConstants.OVERRIDE_TOPIC);
+        if (overrideTopic != null) {
+            LOG.debug("Using override topic: {}", overrideTopic);
+            topic = overrideTopic.toString();
+        }
+
+        if (topic == null) {
+            // if topic property was not received from configuration or header
+            // parameters take it from the remaining URI
+            topic = URISupport.extractRemainderPath(URI.create(endpoint.getEndpointUri()), true);

Review comment:
       +1, great catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org