You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by michaelandrepearce <gi...@git.apache.org> on 2017/10/24 07:56:10 UTC

[GitHub] activemq-artemis pull request #1607: Artemis Kakfa Integration Bridge

GitHub user michaelandrepearce opened a pull request:

    https://github.com/apache/activemq-artemis/pull/1607

    Artemis Kakfa Integration Bridge

    ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridge
    
    Initial Kafka Bridge for Apache ActiveMQ Artemis to Apache Kafka, using Serivce Connector interface
    Support to handle Core or AMQP based protocol over kafka.
    Ensured for Core TextMessage can map to StringSerializer for a consumer
    Ensure for Core ByteMessage can map to ByteArraySerializer for a consumer 
    Kafka Serdes to support kafka consumers to consumer Core or AMQP back to CoreMessage, ProtonMessage, or JMSMessage
    Added Documentation
    Add integration tests

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/michaelandrepearce/activemq-artemis ARTEMIS-1478

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/activemq-artemis/pull/1607.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1607
    
----
commit 77937f23864425d544a7842e13e71b4cd39c0608
Author: Michael André Pearce <mi...@me.com>
Date:   2017-10-24T07:51:36Z

    Artemis Kakfa Integration Bridge
    
    
    ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridge
    
    Initial Kafka Bridge for Apache ActiveMQ Artemis to Apache Kafka, using Serivce Connector interface
    Support to handle Core or AMQP based protocol over kafka.
    Ensured for Core TextMessage can map to StringSerializer for a consumer
    Ensure for Core ByteMessage can map to ByteArraySerializer for a consumer 
    Kafka Serdes to support kafka consumers to consumer Core or AMQP back to CoreMessage, ProtonMessage, or JMSMessage
    Added Documentation
    Add integration tests

----


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146663570
  
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    --- End diff --
    
    Updated so this logic is done with a partitioner (though it is more or less similar still) as we need to use groupID if present to partition by else we fall back to default of using key. But key must be LVQ for compaction topic reasons.


---

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
  
    Skimming the code quickly, I had to do a double take on a couple of occasions, and had some related observations.
    
    There looked to be extensive use of implementation detail classes from the Qpid JMS client here. This seems inherently fragile as these classes are not intended for direct application usage in this fashion, and are entirely subject to change from release to release. Related, there also appeared to be some new class definitions within the org.apache.qpid namespace, which doesn't seem nice either.
    
    Separately, I don't really understand why they would need/want to be used this way anyway. Presumably the Kafka message content in that case is an encoded AMQP message, so anything that can decode AMQP message would work, e.g the proton based bits also included. Its not clear to me why it woudl be desirable to involve JMS at all. Regardless, if you actually wanted a JMS message, why not consume it from the broker using the JMS client to begin with?
    
    The logo feels like something only a standalone component would have.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146868346
  
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/ActiveMQKafkaLogger.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.jboss.logging.BasicLogger;
    +import org.jboss.logging.Logger;
    +import org.jboss.logging.annotations.Cause;
    +import org.jboss.logging.annotations.LogMessage;
    +import org.jboss.logging.annotations.Message;
    +import org.jboss.logging.annotations.MessageLogger;
    +
    +/**
    + * Logger Code 36
    + *
    + * each message id must be 6 digits long starting with 36, the 3rd digit donates the level so
    + *
    + * INF0  1
    + * WARN  2
    + * DEBUG 3
    + * ERROR 4
    + * TRACE 5
    + * FATAL 6
    + *
    + * so an INFO message would be 191000 to 191999
    --- End diff --
    
    agreed.


---

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
  
    Thanks guys for the time, I have kicked off back the dev mail list to discuss the idea of having a sub project for service connectors and other integrations like the spring one. Let’s discuss there.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146532318
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,179 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +![ActiveMQ Artemis Kafka Bridge Logo](images/activemq-kafka-bridge.png)
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction 
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka, 
    +for instance across a WAN, to the cloud, or internet and where the connection may be unreliable.
    +
    +The bridge has built in resilience to failure so if the target server
    +connection is lost, e.g. due to network failure, the bridge will retry
    +connecting to the target until it comes back online. When it comes back
    +online it will resume operation as normal.
    +
    +In summary, Apache ActiveMQ Kafka Bridge is a way to reliably connect separate 
    +Apache ActiveMQ Artemis server and Apache Kafka server together.
    +
    +## Configuring Kakfa Bridges
    +
    +Bridges are configured in `broker.xml`.  
    +Let's kick off
    +with an example (this is actually from the kafka bridge test example):
    +
    +
    +    <connector-services>
    +         <connector-service name="my-kafka-bridge">
    +            <factory-class>org.apache.activemq.artemis.integration.kafka.bridge.KafkaProducerBridgeFactory</factory-class>
    +            <param key="bootstrap.servers" value="kafka-1.domain.local:9092,kafka-2.domain.local:9092,kafka-3.domain.local:9092" />
    +            <param key="queue-name" value="my.artemis.queue" />
    +            <param key="kafka-topic" value="my_kafka_topic" />
    +         </connector-service>
    +    </connector-services>
    +
    +In the above example we have shown the required parameters to
    +configure for a kakfa bridge. See below for a complete list of available configuration options. 
    +
    +### Serialization
    +By default the CoreMessageSerializer is used.
    +
    +#### CoreMessageSerializer
    +Default but can be explicitly set using
    +           
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer" />
    +
    +This maps the Message properties to Record headers.
    +And then maps the payload binary as the Record value, encoding TextMessage
    +
    +This makes it easy to consume from Kafka using default deserializers
    +TextMessage using StringDeserializer and
    +ByteMessage using BytesDeserializer.
    +
    +Also to note, if you serialized an Apache Avro object into a byte array for the ByteMessage you could deserialize using an e
    +
    --- End diff --
    
    pushed, actually turns out i did just actually stop mid-sentence, during writing the doc, i have completed the section now.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146526503
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,179 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +![ActiveMQ Artemis Kafka Bridge Logo](images/activemq-kafka-bridge.png)
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction 
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka, 
    +for instance across a WAN, to the cloud, or internet and where the connection may be unreliable.
    +
    +The bridge has built in resilience to failure so if the target server
    +connection is lost, e.g. due to network failure, the bridge will retry
    +connecting to the target until it comes back online. When it comes back
    +online it will resume operation as normal.
    +
    +In summary, Apache ActiveMQ Kafka Bridge is a way to reliably connect separate 
    +Apache ActiveMQ Artemis server and Apache Kafka server together.
    +
    +## Configuring Kakfa Bridges
    +
    +Bridges are configured in `broker.xml`.  
    +Let's kick off
    +with an example (this is actually from the kafka bridge test example):
    +
    +
    +    <connector-services>
    +         <connector-service name="my-kafka-bridge">
    +            <factory-class>org.apache.activemq.artemis.integration.kafka.bridge.KafkaProducerBridgeFactory</factory-class>
    +            <param key="bootstrap.servers" value="kafka-1.domain.local:9092,kafka-2.domain.local:9092,kafka-3.domain.local:9092" />
    +            <param key="queue-name" value="my.artemis.queue" />
    +            <param key="kafka-topic" value="my_kafka_topic" />
    +         </connector-service>
    +    </connector-services>
    +
    +In the above example we have shown the required parameters to
    +configure for a kakfa bridge. See below for a complete list of available configuration options. 
    +
    +### Serialization
    +By default the CoreMessageSerializer is used.
    +
    +#### CoreMessageSerializer
    +Default but can be explicitly set using
    +           
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer" />
    +
    +This maps the Message properties to Record headers.
    +And then maps the payload binary as the Record value, encoding TextMessage
    +
    +This makes it easy to consume from Kafka using default deserializers
    +TextMessage using StringDeserializer and
    +ByteMessage using BytesDeserializer.
    +
    +Also to note, if you serialized an Apache Avro object into a byte array for the ByteMessage you could deserialize using an e
    +
    --- End diff --
    
    Good spot , Looks like something didn’t commit from my local properly I’ll correct.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146586855
  
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    --- End diff --
    
    You don't have access to native headers in the partitioner. As such where serializers encode the payload into the value and map message headers (groupID) to kafka native headers, it is not available within kafka partitioner class.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146868998
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,180 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction 
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka, 
    +for instance across a WAN, to the cloud, or internet and where the connection may be unreliable.
    +
    +The bridge has built in resilience to failure so if the target server
    +connection is lost, e.g. due to network failure, the bridge will retry
    +connecting to the target until it comes back online. When it comes back
    +online it will resume operation as normal.
    +
    +![ActiveMQ Artemis Kafka Bridge Diagram](images/activemq-kafka-bridge-diagram.png)
    +
    +In summary, Apache ActiveMQ Kafka Bridge is a way to reliably connect separate 
    +Apache ActiveMQ Artemis server and Apache Kafka server together.
    +
    +## Configuring Kakfa Bridges
    +
    +Bridges are configured in `broker.xml`.  
    +Let's kick off
    +with an example (this is actually from the kafka bridge test example):
    +
    +
    +    <connector-services>
    +         <connector-service name="my-kafka-bridge">
    +            <factory-class>org.apache.activemq.artemis.integration.kafka.bridge.KafkaProducerBridgeFactory</factory-class>
    +            <param key="bootstrap.servers" value="kafka-1.domain.local:9092,kafka-2.domain.local:9092,kafka-3.domain.local:9092" />
    +            <param key="queue-name" value="my.artemis.queue" />
    +            <param key="kafka-topic" value="my_kafka_topic" />
    +         </connector-service>
    +    </connector-services>
    +
    +In the above example we have shown the required parameters to
    +configure for a kakfa bridge. See below for a complete list of available configuration options. 
    +
    +### Serialization
    +By default the CoreMessageSerializer is used.
    +
    +#### CoreMessageSerializer
    +Default but can be explicitly set using
    +           
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer" />
    +
    +This maps the Message properties to Record headers.
    +And then maps the payload binary as the Record value, encoding TextMessage
    +
    +This makes it easy to consume from Kafka using default deserializers
    +TextMessage using StringDeserializer and
    +ByteMessage using BytesDeserializer.
    +
    +Also to note, if you serialized your objects using binary serialization like Apache Avro, Apache Thrift etc. into a byte array for the ByteMessage, 
    +as that byte array is preserved in the record value part as is, you could deserialize using an equivalent deserializer 
    +direct to back into Avro Record / Thrift Struct. 
    +
    +Also supplied are some Apache Kafka deserializers allowing you to consume from Apache Kafka 
    +and get a more familiar CoreMessage or JMSMessage, that your consumers can use:
    +
    +`org.apache.activemq.artemis.integration.kafka.protocol.core.jms.CoreJmsMessageDeserializer`
    +`org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageDeserializer`
    +
    +You can get these Apache Kafka, Serializers/Deserializers via maven using the following GAV coordinates:
    +
    +    <depedency>
    +       <groupId>org.apache.activemq</groupId>
    +       <artifactId>artemis-kafka-core-protocol</artifactId>
    +       <version>2.4.0-SNAPSHOT</version>
    +    </depedency>
    +
    +
    +#### AMQPMessageSerializer
    +Can be set by using:
    +    
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.amqp.AMQPMessageSerializer" />
    +
    +
    +This encodes the whole message into amqp binary protocol into the Record value.
    --- End diff --
    
    will capitalise 


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by ppatierno <gi...@git.apache.org>.
Github user ppatierno commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146578570
  
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    --- End diff --
    
    It's the same way as the DefaultPartitioner works in Kafka, why do you re-used the logic here ?


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146821818
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,180 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction 
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka, 
    +for instance across a WAN, to the cloud, or internet and where the connection may be unreliable.
    +
    +The bridge has built in resilience to failure so if the target server
    +connection is lost, e.g. due to network failure, the bridge will retry
    +connecting to the target until it comes back online. When it comes back
    +online it will resume operation as normal.
    +
    +![ActiveMQ Artemis Kafka Bridge Diagram](images/activemq-kafka-bridge-diagram.png)
    +
    +In summary, Apache ActiveMQ Kafka Bridge is a way to reliably connect separate 
    +Apache ActiveMQ Artemis server and Apache Kafka server together.
    +
    +## Configuring Kakfa Bridges
    +
    +Bridges are configured in `broker.xml`.  
    +Let's kick off
    +with an example (this is actually from the kafka bridge test example):
    +
    +
    +    <connector-services>
    +         <connector-service name="my-kafka-bridge">
    +            <factory-class>org.apache.activemq.artemis.integration.kafka.bridge.KafkaProducerBridgeFactory</factory-class>
    +            <param key="bootstrap.servers" value="kafka-1.domain.local:9092,kafka-2.domain.local:9092,kafka-3.domain.local:9092" />
    +            <param key="queue-name" value="my.artemis.queue" />
    +            <param key="kafka-topic" value="my_kafka_topic" />
    +         </connector-service>
    +    </connector-services>
    +
    +In the above example we have shown the required parameters to
    +configure for a kakfa bridge. See below for a complete list of available configuration options. 
    +
    +### Serialization
    +By default the CoreMessageSerializer is used.
    +
    +#### CoreMessageSerializer
    +Default but can be explicitly set using
    +           
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer" />
    +
    +This maps the Message properties to Record headers.
    +And then maps the payload binary as the Record value, encoding TextMessage
    +
    +This makes it easy to consume from Kafka using default deserializers
    +TextMessage using StringDeserializer and
    +ByteMessage using BytesDeserializer.
    +
    +Also to note, if you serialized your objects using binary serialization like Apache Avro, Apache Thrift etc. into a byte array for the ByteMessage, 
    +as that byte array is preserved in the record value part as is, you could deserialize using an equivalent deserializer 
    +direct to back into Avro Record / Thrift Struct. 
    +
    +Also supplied are some Apache Kafka deserializers allowing you to consume from Apache Kafka 
    +and get a more familiar CoreMessage or JMSMessage, that your consumers can use:
    +
    +`org.apache.activemq.artemis.integration.kafka.protocol.core.jms.CoreJmsMessageDeserializer`
    +`org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageDeserializer`
    +
    +You can get these Apache Kafka, Serializers/Deserializers via maven using the following GAV coordinates:
    +
    +    <depedency>
    +       <groupId>org.apache.activemq</groupId>
    +       <artifactId>artemis-kafka-core-protocol</artifactId>
    +       <version>2.4.0-SNAPSHOT</version>
    +    </depedency>
    +
    +
    +#### AMQPMessageSerializer
    +Can be set by using:
    +    
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.amqp.AMQPMessageSerializer" />
    +
    +
    +This encodes the whole message into amqp binary protocol into the Record value.
    +
    +This is useful if you have something like
    +
    +   https://github.com/EnMasseProject/amqp-kafka-bridge
    +
    +As it allows messages to pump into kafka in amqp binary, to then consumed from kafka by clients using amqp binary protocol.
    +
    +Also provided are Apache Qpid Proton based Apache Kafka deserializers allowing you to consume from 
    +Apache Kafka and get a ProtonMessage.
    +
    +`org.apache.activemq.artemis.integration.kafka.protocol.amqp.proton.ProtonMessageDeserializer`
    +
    +You can get these Apache Kafka, Serializers/Deserializers via maven using the following GAV coordinates:
    +
    +    <depedency>
    +       <groupId>org.apache.activemq</groupId>
    +       <artifactId>artemis-kafka-amqp-protocol</artifactId>
    +       <version>2.4.0-SNAPSHOT</version>
    --- End diff --
    
    As earlier version comment.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146870893
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,180 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction 
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka, 
    +for instance across a WAN, to the cloud, or internet and where the connection may be unreliable.
    +
    +The bridge has built in resilience to failure so if the target server
    +connection is lost, e.g. due to network failure, the bridge will retry
    +connecting to the target until it comes back online. When it comes back
    +online it will resume operation as normal.
    +
    +![ActiveMQ Artemis Kafka Bridge Diagram](images/activemq-kafka-bridge-diagram.png)
    --- End diff --
    
    will update image


---

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

Posted by tabish121 <gi...@git.apache.org>.
Github user tabish121 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
  
    @michaelandrepearce apart from the fact that those are all removed now?  


---

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
  
    Closing this for now


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146868313
  
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/GroupIdPartitioner.java ---
    @@ -0,0 +1,57 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.kafka.clients.producer.Partitioner;
    +import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
    +import org.apache.kafka.common.Cluster;
    +import org.apache.kafka.common.utils.Utils;
    +
    +public class GroupIdPartitioner implements Partitioner {
    +
    +   DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
    +
    +   @Override
    +   public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    +      Message message = (Message) value;
    +      final SimpleString groupdID = message.getGroupID();
    +      //If GroupID (a.k.a JMSXMessageGroup) then to honour this we partition based on it.
    --- End diff --
    
    good point.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by ppatierno <gi...@git.apache.org>.
Github user ppatierno commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146579371
  
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    +         }
    +
    +         //Compaction/Record Key (equivalent to Artemis LVQ)
    +         SimpleString key = message.getLastValueProperty();
    +
    +         ProducerRecord<String, Message> producerRecord =
    +             new ProducerRecord<>(
    +                   topicName,
    +                   partition,
    +                   message.getTimestamp(),
    +                   key == null ? null : key.toString(),
    +                   message);
    --- End diff --
    
    Providing both partition and key (if it's not null), the Kafka producer will always use partition which takes precedence on the use of the key. It means that if groupId is specified, the LVQ (as key) won't be consider for addressing the partition. Is it exactly what you had in mind ?


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146817389
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,180 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    --- End diff --
    
    The image should probably use a single directional arrow in the middle like the other image does, given the text below.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146869205
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,180 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction 
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka, 
    --- End diff --
    
    will Capatilise


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146869433
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,180 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    --- End diff --
    
    will change image


---

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
  
    @michaelandrepearce Ah, I missed the note while skimming. The methods will presumably be at package visibility in keeping with the fact they are not intended to be used outside the client itself, so I don't see that changing. I think removing the JMS bits makes sense.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146868275
  
    --- Diff: integration/activemq-kafka/activemq-kafka-protocols/activemq-kafka-amqp-protocol/src/main/java/org/apache/activemq/artemis/integration/kafka/protocol/amqp/AmqpMessageSerializer.java ---
    @@ -0,0 +1,52 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.protocol.amqp;
    +
    +import java.util.Map;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.integration.kafka.protocol.amqp.proton.ProtonMessageSerializer;
    +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
    +import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
    +import org.apache.kafka.common.errors.SerializationException;
    +import org.apache.kafka.common.serialization.Serializer;
    +
    +public class AmqpMessageSerializer implements Serializer<Message> {
    +
    +   ProtonMessageSerializer protonMessageSerializer = new ProtonMessageSerializer();
    +
    +   @Override
    +   public byte[] serialize(String topic, Message message) {
    +      if (message == null) return null;
    +      try {
    +         AMQPMessage amqpMessage = CoreAmqpConverter.checkAMQP(message);
    +         return protonMessageSerializer.serialize(topic, amqpMessage.getProtonMessage());
    --- End diff --
    
    noted.


---

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
  
    @gemmellr Thanks for the comments on the docs, code and images, have addressed these


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146624344
  
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    --- End diff --
    
    No keybytes is the LVQ property if set, as key is used for compacted topics.


---

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
  
    so if looked to take this route how easy is it to get a sub project of activemq artemis (e.g. a bit like NMS)  to hold the eco system tools/extensions such as this? e.g named project: ActiveMQ Artemis Service Connectors to hold these.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146821726
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,180 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction 
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka, 
    +for instance across a WAN, to the cloud, or internet and where the connection may be unreliable.
    +
    +The bridge has built in resilience to failure so if the target server
    +connection is lost, e.g. due to network failure, the bridge will retry
    +connecting to the target until it comes back online. When it comes back
    +online it will resume operation as normal.
    +
    +![ActiveMQ Artemis Kafka Bridge Diagram](images/activemq-kafka-bridge-diagram.png)
    +
    +In summary, Apache ActiveMQ Kafka Bridge is a way to reliably connect separate 
    +Apache ActiveMQ Artemis server and Apache Kafka server together.
    +
    +## Configuring Kakfa Bridges
    +
    +Bridges are configured in `broker.xml`.  
    +Let's kick off
    +with an example (this is actually from the kafka bridge test example):
    +
    +
    +    <connector-services>
    +         <connector-service name="my-kafka-bridge">
    +            <factory-class>org.apache.activemq.artemis.integration.kafka.bridge.KafkaProducerBridgeFactory</factory-class>
    +            <param key="bootstrap.servers" value="kafka-1.domain.local:9092,kafka-2.domain.local:9092,kafka-3.domain.local:9092" />
    +            <param key="queue-name" value="my.artemis.queue" />
    +            <param key="kafka-topic" value="my_kafka_topic" />
    +         </connector-service>
    +    </connector-services>
    +
    +In the above example we have shown the required parameters to
    +configure for a kakfa bridge. See below for a complete list of available configuration options. 
    +
    +### Serialization
    +By default the CoreMessageSerializer is used.
    +
    +#### CoreMessageSerializer
    +Default but can be explicitly set using
    +           
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer" />
    +
    +This maps the Message properties to Record headers.
    +And then maps the payload binary as the Record value, encoding TextMessage
    +
    +This makes it easy to consume from Kafka using default deserializers
    +TextMessage using StringDeserializer and
    +ByteMessage using BytesDeserializer.
    +
    +Also to note, if you serialized your objects using binary serialization like Apache Avro, Apache Thrift etc. into a byte array for the ByteMessage, 
    +as that byte array is preserved in the record value part as is, you could deserialize using an equivalent deserializer 
    +direct to back into Avro Record / Thrift Struct. 
    +
    +Also supplied are some Apache Kafka deserializers allowing you to consume from Apache Kafka 
    +and get a more familiar CoreMessage or JMSMessage, that your consumers can use:
    +
    +`org.apache.activemq.artemis.integration.kafka.protocol.core.jms.CoreJmsMessageDeserializer`
    +`org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageDeserializer`
    +
    +You can get these Apache Kafka, Serializers/Deserializers via maven using the following GAV coordinates:
    +
    +    <depedency>
    +       <groupId>org.apache.activemq</groupId>
    +       <artifactId>artemis-kafka-core-protocol</artifactId>
    +       <version>2.4.0-SNAPSHOT</version>
    --- End diff --
    
    I'd guess version is just going to go stale, and be wrong for any given release docs since its SNAPSHOT. Might be best to use a placeholder value.


---

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
  
    there wasn't a JIRA to remove them, they just didn't get handled in the move to 2.x, and no ones done the work to update them to re-add them. Its not that we don't want connectors.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146868437
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,180 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction 
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka, 
    +for instance across a WAN, to the cloud, or internet and where the connection may be unreliable.
    +
    +The bridge has built in resilience to failure so if the target server
    +connection is lost, e.g. due to network failure, the bridge will retry
    +connecting to the target until it comes back online. When it comes back
    +online it will resume operation as normal.
    +
    +![ActiveMQ Artemis Kafka Bridge Diagram](images/activemq-kafka-bridge-diagram.png)
    +
    +In summary, Apache ActiveMQ Kafka Bridge is a way to reliably connect separate 
    +Apache ActiveMQ Artemis server and Apache Kafka server together.
    +
    +## Configuring Kakfa Bridges
    +
    +Bridges are configured in `broker.xml`.  
    +Let's kick off
    +with an example (this is actually from the kafka bridge test example):
    +
    +
    +    <connector-services>
    +         <connector-service name="my-kafka-bridge">
    +            <factory-class>org.apache.activemq.artemis.integration.kafka.bridge.KafkaProducerBridgeFactory</factory-class>
    +            <param key="bootstrap.servers" value="kafka-1.domain.local:9092,kafka-2.domain.local:9092,kafka-3.domain.local:9092" />
    +            <param key="queue-name" value="my.artemis.queue" />
    +            <param key="kafka-topic" value="my_kafka_topic" />
    +         </connector-service>
    +    </connector-services>
    +
    +In the above example we have shown the required parameters to
    +configure for a kakfa bridge. See below for a complete list of available configuration options. 
    +
    +### Serialization
    +By default the CoreMessageSerializer is used.
    +
    +#### CoreMessageSerializer
    +Default but can be explicitly set using
    +           
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer" />
    +
    +This maps the Message properties to Record headers.
    +And then maps the payload binary as the Record value, encoding TextMessage
    +
    +This makes it easy to consume from Kafka using default deserializers
    +TextMessage using StringDeserializer and
    +ByteMessage using BytesDeserializer.
    +
    +Also to note, if you serialized your objects using binary serialization like Apache Avro, Apache Thrift etc. into a byte array for the ByteMessage, 
    +as that byte array is preserved in the record value part as is, you could deserialize using an equivalent deserializer 
    +direct to back into Avro Record / Thrift Struct. 
    +
    +Also supplied are some Apache Kafka deserializers allowing you to consume from Apache Kafka 
    +and get a more familiar CoreMessage or JMSMessage, that your consumers can use:
    +
    +`org.apache.activemq.artemis.integration.kafka.protocol.core.jms.CoreJmsMessageDeserializer`
    +`org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageDeserializer`
    +
    +You can get these Apache Kafka, Serializers/Deserializers via maven using the following GAV coordinates:
    +
    +    <depedency>
    +       <groupId>org.apache.activemq</groupId>
    +       <artifactId>artemis-kafka-core-protocol</artifactId>
    +       <version>2.4.0-SNAPSHOT</version>
    +    </depedency>
    +
    +
    +#### AMQPMessageSerializer
    +Can be set by using:
    +    
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.amqp.AMQPMessageSerializer" />
    +
    +
    +This encodes the whole message into amqp binary protocol into the Record value.
    +
    +This is useful if you have something like
    +
    +   https://github.com/EnMasseProject/amqp-kafka-bridge
    +
    +As it allows messages to pump into kafka in amqp binary, to then consumed from kafka by clients using amqp binary protocol.
    +
    +Also provided are Apache Qpid Proton based Apache Kafka deserializers allowing you to consume from 
    +Apache Kafka and get a ProtonMessage.
    --- End diff --
    
    will do.


---

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
  
    I don't think that makes sense personally. If the end view is that it should be maintained outwith the broker repo/distribution, that is where it should go from the outset. In the interim folks can build it themselves.
    
    In terms of establishing a location outside the broker repo, or getting more views on whether that is what should happen, perhaps you should start a discussion on the dev list about where it goes. 


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce closed the pull request at:

    https://github.com/apache/activemq-artemis/pull/1607


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146830052
  
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/ActiveMQKafkaLogger.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.jboss.logging.BasicLogger;
    +import org.jboss.logging.Logger;
    +import org.jboss.logging.annotations.Cause;
    +import org.jboss.logging.annotations.LogMessage;
    +import org.jboss.logging.annotations.Message;
    +import org.jboss.logging.annotations.MessageLogger;
    +
    +/**
    + * Logger Code 36
    + *
    + * each message id must be 6 digits long starting with 36, the 3rd digit donates the level so
    + *
    + * INF0  1
    + * WARN  2
    + * DEBUG 3
    + * ERROR 4
    + * TRACE 5
    + * FATAL 6
    + *
    + * so an INFO message would be 191000 to 191999
    --- End diff --
    
    Example doesn't match the rest of the comment/code.


---

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
  
    @gemmellr I've removed the qpid JMS bit.
    
    @tabish121 i see this no different to the vertx connecter service etc that was/is in 1.5.x range.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146822624
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,180 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction 
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka, 
    +for instance across a WAN, to the cloud, or internet and where the connection may be unreliable.
    +
    +The bridge has built in resilience to failure so if the target server
    +connection is lost, e.g. due to network failure, the bridge will retry
    +connecting to the target until it comes back online. When it comes back
    +online it will resume operation as normal.
    +
    +![ActiveMQ Artemis Kafka Bridge Diagram](images/activemq-kafka-bridge-diagram.png)
    +
    +In summary, Apache ActiveMQ Kafka Bridge is a way to reliably connect separate 
    +Apache ActiveMQ Artemis server and Apache Kafka server together.
    +
    +## Configuring Kakfa Bridges
    +
    +Bridges are configured in `broker.xml`.  
    +Let's kick off
    +with an example (this is actually from the kafka bridge test example):
    +
    +
    +    <connector-services>
    +         <connector-service name="my-kafka-bridge">
    +            <factory-class>org.apache.activemq.artemis.integration.kafka.bridge.KafkaProducerBridgeFactory</factory-class>
    +            <param key="bootstrap.servers" value="kafka-1.domain.local:9092,kafka-2.domain.local:9092,kafka-3.domain.local:9092" />
    +            <param key="queue-name" value="my.artemis.queue" />
    +            <param key="kafka-topic" value="my_kafka_topic" />
    +         </connector-service>
    +    </connector-services>
    +
    +In the above example we have shown the required parameters to
    +configure for a kakfa bridge. See below for a complete list of available configuration options. 
    +
    +### Serialization
    +By default the CoreMessageSerializer is used.
    +
    +#### CoreMessageSerializer
    +Default but can be explicitly set using
    +           
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer" />
    +
    +This maps the Message properties to Record headers.
    +And then maps the payload binary as the Record value, encoding TextMessage
    +
    +This makes it easy to consume from Kafka using default deserializers
    +TextMessage using StringDeserializer and
    +ByteMessage using BytesDeserializer.
    +
    +Also to note, if you serialized your objects using binary serialization like Apache Avro, Apache Thrift etc. into a byte array for the ByteMessage, 
    +as that byte array is preserved in the record value part as is, you could deserialize using an equivalent deserializer 
    +direct to back into Avro Record / Thrift Struct. 
    +
    +Also supplied are some Apache Kafka deserializers allowing you to consume from Apache Kafka 
    +and get a more familiar CoreMessage or JMSMessage, that your consumers can use:
    +
    +`org.apache.activemq.artemis.integration.kafka.protocol.core.jms.CoreJmsMessageDeserializer`
    +`org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageDeserializer`
    +
    +You can get these Apache Kafka, Serializers/Deserializers via maven using the following GAV coordinates:
    +
    +    <depedency>
    +       <groupId>org.apache.activemq</groupId>
    +       <artifactId>artemis-kafka-core-protocol</artifactId>
    +       <version>2.4.0-SNAPSHOT</version>
    +    </depedency>
    +
    +
    +#### AMQPMessageSerializer
    +Can be set by using:
    +    
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.amqp.AMQPMessageSerializer" />
    +
    +
    +This encodes the whole message into amqp binary protocol into the Record value.
    +
    +This is useful if you have something like
    +
    +   https://github.com/EnMasseProject/amqp-kafka-bridge
    +
    +As it allows messages to pump into kafka in amqp binary, to then consumed from kafka by clients using amqp binary protocol.
    +
    +Also provided are Apache Qpid Proton based Apache Kafka deserializers allowing you to consume from 
    +Apache Kafka and get a ProtonMessage.
    --- End diff --
    
    Needs a space between Proton and Message. 


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146872811
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,180 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    --- End diff --
    
    have updated image.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146872793
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,180 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction 
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka, 
    +for instance across a WAN, to the cloud, or internet and where the connection may be unreliable.
    +
    +The bridge has built in resilience to failure so if the target server
    +connection is lost, e.g. due to network failure, the bridge will retry
    +connecting to the target until it comes back online. When it comes back
    +online it will resume operation as normal.
    +
    +![ActiveMQ Artemis Kafka Bridge Diagram](images/activemq-kafka-bridge-diagram.png)
    --- End diff --
    
    have updated image.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146818341
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,180 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction 
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka, 
    +for instance across a WAN, to the cloud, or internet and where the connection may be unreliable.
    +
    +The bridge has built in resilience to failure so if the target server
    +connection is lost, e.g. due to network failure, the bridge will retry
    +connecting to the target until it comes back online. When it comes back
    +online it will resume operation as normal.
    +
    +![ActiveMQ Artemis Kafka Bridge Diagram](images/activemq-kafka-bridge-diagram.png)
    --- End diff --
    
    The (JMS / Proton) qualifier should be removed from the AMQP Client box. Other clients are available, etc.
    
    I'd say the same for JMS in the JMS Core Client box, if it also works for the non-JMS client.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146868386
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,180 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction 
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka, 
    +for instance across a WAN, to the cloud, or internet and where the connection may be unreliable.
    +
    +The bridge has built in resilience to failure so if the target server
    +connection is lost, e.g. due to network failure, the bridge will retry
    +connecting to the target until it comes back online. When it comes back
    +online it will resume operation as normal.
    +
    +![ActiveMQ Artemis Kafka Bridge Diagram](images/activemq-kafka-bridge-diagram.png)
    +
    +In summary, Apache ActiveMQ Kafka Bridge is a way to reliably connect separate 
    +Apache ActiveMQ Artemis server and Apache Kafka server together.
    +
    +## Configuring Kakfa Bridges
    +
    +Bridges are configured in `broker.xml`.  
    +Let's kick off
    +with an example (this is actually from the kafka bridge test example):
    +
    +
    +    <connector-services>
    +         <connector-service name="my-kafka-bridge">
    +            <factory-class>org.apache.activemq.artemis.integration.kafka.bridge.KafkaProducerBridgeFactory</factory-class>
    +            <param key="bootstrap.servers" value="kafka-1.domain.local:9092,kafka-2.domain.local:9092,kafka-3.domain.local:9092" />
    +            <param key="queue-name" value="my.artemis.queue" />
    +            <param key="kafka-topic" value="my_kafka_topic" />
    +         </connector-service>
    +    </connector-services>
    +
    +In the above example we have shown the required parameters to
    +configure for a kakfa bridge. See below for a complete list of available configuration options. 
    +
    +### Serialization
    +By default the CoreMessageSerializer is used.
    +
    +#### CoreMessageSerializer
    +Default but can be explicitly set using
    +           
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer" />
    +
    +This maps the Message properties to Record headers.
    +And then maps the payload binary as the Record value, encoding TextMessage
    +
    +This makes it easy to consume from Kafka using default deserializers
    +TextMessage using StringDeserializer and
    +ByteMessage using BytesDeserializer.
    +
    +Also to note, if you serialized your objects using binary serialization like Apache Avro, Apache Thrift etc. into a byte array for the ByteMessage, 
    +as that byte array is preserved in the record value part as is, you could deserialize using an equivalent deserializer 
    +direct to back into Avro Record / Thrift Struct. 
    +
    +Also supplied are some Apache Kafka deserializers allowing you to consume from Apache Kafka 
    +and get a more familiar CoreMessage or JMSMessage, that your consumers can use:
    +
    +`org.apache.activemq.artemis.integration.kafka.protocol.core.jms.CoreJmsMessageDeserializer`
    +`org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageDeserializer`
    +
    +You can get these Apache Kafka, Serializers/Deserializers via maven using the following GAV coordinates:
    +
    +    <depedency>
    +       <groupId>org.apache.activemq</groupId>
    +       <artifactId>artemis-kafka-core-protocol</artifactId>
    +       <version>2.4.0-SNAPSHOT</version>
    +    </depedency>
    +
    +
    +#### AMQPMessageSerializer
    +Can be set by using:
    +    
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.amqp.AMQPMessageSerializer" />
    +
    +
    +This encodes the whole message into amqp binary protocol into the Record value.
    +
    +This is useful if you have something like
    +
    +   https://github.com/EnMasseProject/amqp-kafka-bridge
    +
    +As it allows messages to pump into kafka in amqp binary, to then consumed from kafka by clients using amqp binary protocol.
    +
    +Also provided are Apache Qpid Proton based Apache Kafka deserializers allowing you to consume from 
    +Apache Kafka and get a ProtonMessage.
    +
    +`org.apache.activemq.artemis.integration.kafka.protocol.amqp.proton.ProtonMessageDeserializer`
    +
    +You can get these Apache Kafka, Serializers/Deserializers via maven using the following GAV coordinates:
    +
    +    <depedency>
    +       <groupId>org.apache.activemq</groupId>
    +       <artifactId>artemis-kafka-amqp-protocol</artifactId>
    +       <version>2.4.0-SNAPSHOT</version>
    --- End diff --
    
    agreed.


---

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
  
    @gemmellr agreed on class def in names space this was because we couldn't create object because methods are package, as it notes it would be good if could get qpid to make them public to have them re-usable?
    
    Yes anything can work, we are simply providing an implementation to make things easy out the box. 
    
    Im happy to remove the QPID JMS one it is just for end users, giving as i said an out the box solution for them consumer side, but  its not actually needed by the bridge.
    
    



---

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

Posted by mtaylor <gi...@git.apache.org>.
Github user mtaylor commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
  
    My thoughts on this.  
    
    I love the idea of having a Kafka to/from Artemis connector.  However, we were burnt in the past with carrying integration code, e.g. aerogear and vertx connections.  The code was contributed but wasn't maintained, it required expertise/knowledge of the project it was integrating with and in the end, what happened was that those projects moved forward and integration bits were left to rot in the broker.  Users weren't quite sure were we stood.  
    
    We ended up solving the problem by removing the service connector implementations.  My view going forward from this was that the broker should provide the pieces to enable integration, but that the integration plugins themselves were kept maintained outside of the broker.  



---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146869042
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,180 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction 
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka, 
    +for instance across a WAN, to the cloud, or internet and where the connection may be unreliable.
    +
    +The bridge has built in resilience to failure so if the target server
    +connection is lost, e.g. due to network failure, the bridge will retry
    +connecting to the target until it comes back online. When it comes back
    +online it will resume operation as normal.
    +
    +![ActiveMQ Artemis Kafka Bridge Diagram](images/activemq-kafka-bridge-diagram.png)
    +
    +In summary, Apache ActiveMQ Kafka Bridge is a way to reliably connect separate 
    +Apache ActiveMQ Artemis server and Apache Kafka server together.
    +
    +## Configuring Kakfa Bridges
    +
    +Bridges are configured in `broker.xml`.  
    +Let's kick off
    +with an example (this is actually from the kafka bridge test example):
    +
    +
    +    <connector-services>
    +         <connector-service name="my-kafka-bridge">
    +            <factory-class>org.apache.activemq.artemis.integration.kafka.bridge.KafkaProducerBridgeFactory</factory-class>
    +            <param key="bootstrap.servers" value="kafka-1.domain.local:9092,kafka-2.domain.local:9092,kafka-3.domain.local:9092" />
    +            <param key="queue-name" value="my.artemis.queue" />
    +            <param key="kafka-topic" value="my_kafka_topic" />
    +         </connector-service>
    +    </connector-services>
    +
    +In the above example we have shown the required parameters to
    +configure for a kakfa bridge. See below for a complete list of available configuration options. 
    +
    +### Serialization
    +By default the CoreMessageSerializer is used.
    +
    +#### CoreMessageSerializer
    +Default but can be explicitly set using
    +           
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer" />
    +
    +This maps the Message properties to Record headers.
    +And then maps the payload binary as the Record value, encoding TextMessage
    +
    +This makes it easy to consume from Kafka using default deserializers
    +TextMessage using StringDeserializer and
    +ByteMessage using BytesDeserializer.
    +
    +Also to note, if you serialized your objects using binary serialization like Apache Avro, Apache Thrift etc. into a byte array for the ByteMessage, 
    +as that byte array is preserved in the record value part as is, you could deserialize using an equivalent deserializer 
    +direct to back into Avro Record / Thrift Struct. 
    +
    +Also supplied are some Apache Kafka deserializers allowing you to consume from Apache Kafka 
    +and get a more familiar CoreMessage or JMSMessage, that your consumers can use:
    +
    +`org.apache.activemq.artemis.integration.kafka.protocol.core.jms.CoreJmsMessageDeserializer`
    +`org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageDeserializer`
    +
    +You can get these Apache Kafka, Serializers/Deserializers via maven using the following GAV coordinates:
    +
    +    <depedency>
    +       <groupId>org.apache.activemq</groupId>
    +       <artifactId>artemis-kafka-core-protocol</artifactId>
    +       <version>2.4.0-SNAPSHOT</version>
    --- End diff --
    
    agreed, will replace with 2.x.x


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146824362
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,180 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction 
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka, 
    --- End diff --
    
    Various instances of 'kafka' and 'artemis', such as those here, should probably be Kafka and Artemis instead.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146628048
  
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    --- End diff --
    
    I have an idea of handling this with a custom partitioner and custom serialiser as an alternative 


---

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
  
    Vertx and aerogear were removed for lack of users also.  No one complained. If we had complainers we could put them back. 


---

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
  
    @gemmellr have removed entirely to avoid issue.


---

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

Posted by cshannon <gi...@git.apache.org>.
Github user cshannon commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
  
    I agree with @mtaylor that I think this should be maintained outside the broker for the reasons he already mentioned.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by ppatierno <gi...@git.apache.org>.
Github user ppatierno commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146578236
  
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    +         }
    +
    +         //Compaction/Record Key (equivalent to Artemis LVQ)
    +         SimpleString key = message.getLastValueProperty();
    --- End diff --
    
    I'm not an expert on Artemis but can you explain me why the LVQ is used as key for partitions ?


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146586304
  
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    +         }
    +
    +         //Compaction/Record Key (equivalent to Artemis LVQ)
    +         SimpleString key = message.getLastValueProperty();
    +
    +         ProducerRecord<String, Message> producerRecord =
    +             new ProducerRecord<>(
    +                   topicName,
    +                   partition,
    +                   message.getTimestamp(),
    +                   key == null ? null : key.toString(),
    +                   message);
    --- End diff --
    
    Yes key, is used for compaction in kafka primarily, we use groupId if present to primarily partition by to keep similar semantics as consumers in kafka consume by partition there for a group is honoured.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by ppatierno <gi...@git.apache.org>.
Github user ppatierno commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146498875
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,179 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +![ActiveMQ Artemis Kafka Bridge Logo](images/activemq-kafka-bridge.png)
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction 
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka, 
    +for instance across a WAN, to the cloud, or internet and where the connection may be unreliable.
    +
    +The bridge has built in resilience to failure so if the target server
    +connection is lost, e.g. due to network failure, the bridge will retry
    +connecting to the target until it comes back online. When it comes back
    +online it will resume operation as normal.
    +
    +In summary, Apache ActiveMQ Kafka Bridge is a way to reliably connect separate 
    +Apache ActiveMQ Artemis server and Apache Kafka server together.
    +
    +## Configuring Kakfa Bridges
    +
    +Bridges are configured in `broker.xml`.  
    +Let's kick off
    +with an example (this is actually from the kafka bridge test example):
    +
    +
    +    <connector-services>
    +         <connector-service name="my-kafka-bridge">
    +            <factory-class>org.apache.activemq.artemis.integration.kafka.bridge.KafkaProducerBridgeFactory</factory-class>
    +            <param key="bootstrap.servers" value="kafka-1.domain.local:9092,kafka-2.domain.local:9092,kafka-3.domain.local:9092" />
    +            <param key="queue-name" value="my.artemis.queue" />
    +            <param key="kafka-topic" value="my_kafka_topic" />
    +         </connector-service>
    +    </connector-services>
    +
    +In the above example we have shown the required parameters to
    +configure for a kakfa bridge. See below for a complete list of available configuration options. 
    +
    +### Serialization
    +By default the CoreMessageSerializer is used.
    +
    +#### CoreMessageSerializer
    +Default but can be explicitly set using
    +           
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer" />
    +
    +This maps the Message properties to Record headers.
    +And then maps the payload binary as the Record value, encoding TextMessage
    +
    +This makes it easy to consume from Kafka using default deserializers
    +TextMessage using StringDeserializer and
    +ByteMessage using BytesDeserializer.
    +
    +Also to note, if you serialized an Apache Avro object into a byte array for the ByteMessage you could deserialize using an e
    +
    --- End diff --
    
    is this a broken line ?


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146824495
  
    --- Diff: docs/user-manual/en/kafka-bridges.md ---
    @@ -0,0 +1,180 @@
    +# Apache ActiveMQ Kafka Bridge
    +
    +The function of a bridge is to consume messages from a source queue in Apache ActiveMQ Artemis, 
    +and forward them to a target topic, on a remote Apache Kafka server.
    +
    +By pairing Apache ActiveMQ Artemis and Apache Kafka with the bridge you could have a hybrid broker setup, 
    +having a data flow with CORE, AMQP, MQTT clients, as well as now Kafka clients also. 
    +Taking and giving the best features of both broker technologies when and where needed for a flow of data.
    +
    +![ActiveMQ Artemis Kafka Bridge Clients](images/activemq-kafka-bridge-clients.png)
    +
    +The intent is this will be a two way bridge, but currently the flow is a single direction 
    +from Apache ActiveMQ Artemis to Apache Kafka
    +
    +
    +The source and target servers are remote making bridging suitable
    +for reliably sending messages from one artemis cluster to kafka, 
    +for instance across a WAN, to the cloud, or internet and where the connection may be unreliable.
    +
    +The bridge has built in resilience to failure so if the target server
    +connection is lost, e.g. due to network failure, the bridge will retry
    +connecting to the target until it comes back online. When it comes back
    +online it will resume operation as normal.
    +
    +![ActiveMQ Artemis Kafka Bridge Diagram](images/activemq-kafka-bridge-diagram.png)
    +
    +In summary, Apache ActiveMQ Kafka Bridge is a way to reliably connect separate 
    +Apache ActiveMQ Artemis server and Apache Kafka server together.
    +
    +## Configuring Kakfa Bridges
    +
    +Bridges are configured in `broker.xml`.  
    +Let's kick off
    +with an example (this is actually from the kafka bridge test example):
    +
    +
    +    <connector-services>
    +         <connector-service name="my-kafka-bridge">
    +            <factory-class>org.apache.activemq.artemis.integration.kafka.bridge.KafkaProducerBridgeFactory</factory-class>
    +            <param key="bootstrap.servers" value="kafka-1.domain.local:9092,kafka-2.domain.local:9092,kafka-3.domain.local:9092" />
    +            <param key="queue-name" value="my.artemis.queue" />
    +            <param key="kafka-topic" value="my_kafka_topic" />
    +         </connector-service>
    +    </connector-services>
    +
    +In the above example we have shown the required parameters to
    +configure for a kakfa bridge. See below for a complete list of available configuration options. 
    +
    +### Serialization
    +By default the CoreMessageSerializer is used.
    +
    +#### CoreMessageSerializer
    +Default but can be explicitly set using
    +           
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer" />
    +
    +This maps the Message properties to Record headers.
    +And then maps the payload binary as the Record value, encoding TextMessage
    +
    +This makes it easy to consume from Kafka using default deserializers
    +TextMessage using StringDeserializer and
    +ByteMessage using BytesDeserializer.
    +
    +Also to note, if you serialized your objects using binary serialization like Apache Avro, Apache Thrift etc. into a byte array for the ByteMessage, 
    +as that byte array is preserved in the record value part as is, you could deserialize using an equivalent deserializer 
    +direct to back into Avro Record / Thrift Struct. 
    +
    +Also supplied are some Apache Kafka deserializers allowing you to consume from Apache Kafka 
    +and get a more familiar CoreMessage or JMSMessage, that your consumers can use:
    +
    +`org.apache.activemq.artemis.integration.kafka.protocol.core.jms.CoreJmsMessageDeserializer`
    +`org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageDeserializer`
    +
    +You can get these Apache Kafka, Serializers/Deserializers via maven using the following GAV coordinates:
    +
    +    <depedency>
    +       <groupId>org.apache.activemq</groupId>
    +       <artifactId>artemis-kafka-core-protocol</artifactId>
    +       <version>2.4.0-SNAPSHOT</version>
    +    </depedency>
    +
    +
    +#### AMQPMessageSerializer
    +Can be set by using:
    +    
    +    <param key="value.serializer" value="org.apache.activemq.artemis.integration.kafka.protocol.amqp.AMQPMessageSerializer" />
    +
    +
    +This encodes the whole message into amqp binary protocol into the Record value.
    --- End diff --
    
    Various instances of 'amqp', such as those here, should probably be AMQP instead.


---

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

Posted by tabish121 <gi...@git.apache.org>.
Github user tabish121 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
  
    Given that the code being used from Qpid JMS is internal implementation that has no promise of remaining stable and will not be made public I think that stuff should go.  
    
    So far from what I see this doesn't seem like something that should go into the broker project but instead could be a stand alone project.   I'd prefer not to drag in Kafka dependencies into the distribution when not needed.  


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by ppatierno <gi...@git.apache.org>.
Github user ppatierno commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146623227
  
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    --- End diff --
    
    "You don't have access to native headers in the partitioner" ...  I didn't get this point. Serialization and Partitioning are two different steps in the Kafka producer pipeline (first serialization, then partitioning). About the partitioner, if you pass the key to the producer send method, it applies exactly this : 
    
    `Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions`
    
    where `keyBytes` is just  `groupid`. Isn't it ?


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146586161
  
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message> kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    +         }
    +
    +         //Compaction/Record Key (equivalent to Artemis LVQ)
    +         SimpleString key = message.getLastValueProperty();
    --- End diff --
    
    Its not used for partitions, Key in Kafka is used for compaction. We use groupId (JMS MessageGroup) to sort partition see code above.


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146825025
  
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/GroupIdPartitioner.java ---
    @@ -0,0 +1,57 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.kafka.clients.producer.Partitioner;
    +import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
    +import org.apache.kafka.common.Cluster;
    +import org.apache.kafka.common.utils.Utils;
    +
    +public class GroupIdPartitioner implements Partitioner {
    +
    +   DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
    +
    +   @Override
    +   public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    +      Message message = (Message) value;
    +      final SimpleString groupdID = message.getGroupID();
    +      //If GroupID (a.k.a JMSXMessageGroup) then to honour this we partition based on it.
    --- End diff --
    
    JMSXGroupID rather than JMSXMessageGroup?


---

[GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...

Posted by gemmellr <gi...@git.apache.org>.
Github user gemmellr commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146828027
  
    --- Diff: integration/activemq-kafka/activemq-kafka-protocols/activemq-kafka-amqp-protocol/src/main/java/org/apache/activemq/artemis/integration/kafka/protocol/amqp/AmqpMessageSerializer.java ---
    @@ -0,0 +1,52 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.protocol.amqp;
    +
    +import java.util.Map;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.integration.kafka.protocol.amqp.proton.ProtonMessageSerializer;
    +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
    +import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
    +import org.apache.kafka.common.errors.SerializationException;
    +import org.apache.kafka.common.serialization.Serializer;
    +
    +public class AmqpMessageSerializer implements Serializer<Message> {
    +
    +   ProtonMessageSerializer protonMessageSerializer = new ProtonMessageSerializer();
    +
    +   @Override
    +   public byte[] serialize(String topic, Message message) {
    +      if (message == null) return null;
    +      try {
    +         AMQPMessage amqpMessage = CoreAmqpConverter.checkAMQP(message);
    +         return protonMessageSerializer.serialize(topic, amqpMessage.getProtonMessage());
    --- End diff --
    
    This is probably the best/simplest thing to do for now, but its worth noting for later that it could be quite inefficient and unnecessary depending on what happened before it got here. Its encoding the AMQP data from scratch using a Proton message object, when it might have encoded AMQP data already. It might even need to decode AMQP data first here to even get the Proton message object.


---

[GitHub] activemq-artemis issue #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafka Bridg...

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/1607
  
    For the interim until we sort seperate sub project so this isn’t lost I’d like to merge this still then split it out later. 


---