You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2020/08/15 06:04:57 UTC

[GitHub] [camel] davsclaus commented on a change in pull request #4091: CAMEL-15356: Add Azure EventHubs component

davsclaus commented on a change in pull request #4091:
URL: https://github.com/apache/camel/pull/4091#discussion_r470942687



##########
File path: components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsProducer.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.eventhubs;
+
+import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.azure.eventhubs.client.EventHubsClientFactory;
+import org.apache.camel.component.azure.eventhubs.operations.EventHubsProducerOperations;
+import org.apache.camel.support.DefaultAsyncProducer;
+
+public class EventHubsProducer extends DefaultAsyncProducer {
+
+    private EventHubProducerAsyncClient producerAsyncClient;
+    private EventHubsProducerOperations producerOperations;
+
+    public EventHubsProducer(final Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        // create the client
+        producerAsyncClient = EventHubsClientFactory.createEventHubProducerAsyncClient(getEndpoint().getConfiguration());
+
+        // create our operations
+        producerOperations = new EventHubsProducerOperations(producerAsyncClient, getConfiguration());
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            return producerOperations.sendEvents(exchange, callback);
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // shutdown async client
+        producerAsyncClient.close();

Review comment:
       The != null here

##########
File path: components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsEndpoint.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.eventhubs;
+
+import com.azure.messaging.eventhubs.models.ErrorContext;
+import com.azure.messaging.eventhubs.models.EventContext;
+import org.apache.camel.Category;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.support.DefaultEndpoint;
+
+/**
+ * The azure-eventhubs component that integrates Azure Event Hubs using AMQP protocol. Azure EventHubs is a highly scalable publish-subscribe service that
+ * can ingest millions of events per second and stream them to multiple consumers.
+ */
+@UriEndpoint(firstVersion = "3.5.0", scheme = "azure-eventhubs", title = "Azure Event Hubs", syntax = "azure-eventhubs:namespace/eventHubName", category = {
+        Category.CLOUD, Category.MESSAGING })
+public class EventHubsEndpoint extends DefaultEndpoint {
+
+    @UriParam
+    private EventHubsConfiguration configuration;
+
+    public EventHubsEndpoint(final String uri, final Component component, final EventHubsConfiguration configuration) {
+        super(uri, component);
+        this.configuration = configuration;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new EventHubsProducer(this);
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new EventHubsConsumer(this, processor);

Review comment:
       Call configureConsumer(consumer) also, see other components how-to

##########
File path: components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.eventhubs;
+
+import com.azure.messaging.eventhubs.EventProcessorClient;
+import com.azure.messaging.eventhubs.models.ErrorContext;
+import com.azure.messaging.eventhubs.models.EventContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.azure.eventhubs.client.EventHubsClientFactory;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.DefaultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventHubsConsumer extends DefaultConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(EventHubsConsumer.class);
+
+    // we use the EventProcessorClient as recommended by Azure docs to consume from all partitions
+    private EventProcessorClient processorClient;
+
+    public EventHubsConsumer(final EventHubsEndpoint endpoint, final Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        // create the client
+        processorClient = EventHubsClientFactory.createEventProcessorClient(getConfiguration(),
+                this::onEventListener, this::onErrorListener);
+
+        // start the client but we will rely on the Azure Client Scheduler for thread management
+        processorClient.start();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // shutdown the client
+        processorClient.stop();

Review comment:
       guard with != null as if camel fails to startup then this start method may not have been invoked and camel will call stop 

##########
File path: components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperations.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.eventhubs.operations;
+
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
+import com.azure.messaging.eventhubs.models.SendOptions;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.azure.eventhubs.EventHubsConfiguration;
+import org.apache.camel.component.azure.eventhubs.EventHubsConfigurationOptionsProxy;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+
+public class EventHubsProducerOperations {
+
+    private static final Logger LOG = LoggerFactory.getLogger(EventHubsProducerOperations.class);
+
+    private final EventHubProducerAsyncClient producerAsyncClient;
+    private final EventHubsConfigurationOptionsProxy configurationOptionsProxy;
+
+    public EventHubsProducerOperations(final EventHubProducerAsyncClient producerAsyncClient, final EventHubsConfiguration configuration) {
+        ObjectHelper.notNull(producerAsyncClient, "client cannot be null");
+
+        this.producerAsyncClient = producerAsyncClient;
+        configurationOptionsProxy = new EventHubsConfigurationOptionsProxy(configuration);
+    }
+
+    public boolean sendEvents(final Exchange exchange, final AsyncCallback callback) {
+        ObjectHelper.notNull(exchange, "exchange cannot be null");
+        ObjectHelper.notNull(callback, "callback cannot be null");
+
+        final SendOptions sendOptions = createSendOptions(configurationOptionsProxy.getPartitionKey(exchange), configurationOptionsProxy.getPartitionId(exchange));
+        final Iterable<EventData> eventData = createEventData(exchange);
+
+        return sendAsyncEvents(eventData, sendOptions, exchange, callback);
+    }
+
+    private boolean sendAsyncEvents(final Iterable<EventData> eventData, final SendOptions sendOptions, final Exchange exchange, final AsyncCallback asyncCallback) {
+        final AtomicBoolean done = new AtomicBoolean(false);

Review comment:
       The asyncCallback must only be invoked once, not sure if the 3 branches below can cause it to be invoked twice etc.
   
   So you should essentially only return true if the method exits early and its the same thread that called this method that is calling the callback.
   
   So I would remove the atomic boolean, and make this method return false. And then in the sendAsyncEventsWithSuitableMethod you call the callback with (false) as parameter as it must match what this method returned.

##########
File path: components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperations.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.eventhubs.operations;
+
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
+import com.azure.messaging.eventhubs.models.SendOptions;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.azure.eventhubs.EventHubsConfiguration;
+import org.apache.camel.component.azure.eventhubs.EventHubsConfigurationOptionsProxy;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+
+public class EventHubsProducerOperations {
+
+    private static final Logger LOG = LoggerFactory.getLogger(EventHubsProducerOperations.class);
+
+    private final EventHubProducerAsyncClient producerAsyncClient;
+    private final EventHubsConfigurationOptionsProxy configurationOptionsProxy;
+
+    public EventHubsProducerOperations(final EventHubProducerAsyncClient producerAsyncClient, final EventHubsConfiguration configuration) {
+        ObjectHelper.notNull(producerAsyncClient, "client cannot be null");
+
+        this.producerAsyncClient = producerAsyncClient;
+        configurationOptionsProxy = new EventHubsConfigurationOptionsProxy(configuration);
+    }
+
+    public boolean sendEvents(final Exchange exchange, final AsyncCallback callback) {
+        ObjectHelper.notNull(exchange, "exchange cannot be null");
+        ObjectHelper.notNull(callback, "callback cannot be null");
+
+        final SendOptions sendOptions = createSendOptions(configurationOptionsProxy.getPartitionKey(exchange), configurationOptionsProxy.getPartitionId(exchange));
+        final Iterable<EventData> eventData = createEventData(exchange);
+
+        return sendAsyncEvents(eventData, sendOptions, exchange, callback);
+    }
+
+    private boolean sendAsyncEvents(final Iterable<EventData> eventData, final SendOptions sendOptions, final Exchange exchange, final AsyncCallback asyncCallback) {
+        final AtomicBoolean done = new AtomicBoolean(false);

Review comment:
       Also does the subcribe event below make any sense to react on? I would assume its either only the error or the completion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org