You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/10/07 10:07:41 UTC

[camel-kafka-connector] branch master updated: [camel-main-support] Use BaseMainSupport instead of Main #499

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 75ccafa  [camel-main-support] Use BaseMainSupport instead of Main #499
75ccafa is described below

commit 75ccafa823b4215bf2836b2586940073649537c9
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Oct 7 11:18:00 2020 +0200

    [camel-main-support] Use BaseMainSupport instead of Main #499
---
 .../utils/CamelKafkaConnectMain.java               | 122 +++++++++++++++
 .../kafkaconnector/utils/CamelMainSupport.java     | 165 ++++-----------------
 2 files changed, 153 insertions(+), 134 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
new file mode 100644
index 0000000..46b0822
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.utils;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ConsumerTemplate;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.main.BaseMainSupport;
+import org.apache.camel.main.MainListener;
+import org.apache.camel.support.service.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CamelKafkaConnectMain extends BaseMainSupport {
+    public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat.";
+    private static final Logger LOG = LoggerFactory.getLogger(CamelKafkaConnectMain.class);
+
+    protected volatile ConsumerTemplate consumerTemplate;
+    protected volatile ProducerTemplate producerTemplate;
+
+    public CamelKafkaConnectMain(CamelContext context) {
+        this.camelContext = context;
+    }
+
+    @Override
+    protected void doInit() throws Exception {
+        super.doInit();
+        postProcessCamelContext(camelContext);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        LOG.info("Starting Main");
+
+        for (MainListener listener : listeners) {
+            listener.beforeStart(this);
+        }
+
+        super.doStart();
+
+        getCamelContext().start();
+
+        for (MainListener listener : listeners) {
+            listener.afterStart(this);
+        }
+
+        LOG.info("Main started");
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        LOG.info("Stopping Main");
+
+        ServiceHelper.stopService(consumerTemplate);
+        consumerTemplate = null;
+
+        ServiceHelper.stopService(producerTemplate);
+        producerTemplate = null;
+
+        for (MainListener listener : listeners) {
+            listener.beforeStop(this);
+        }
+
+        super.doStart();
+
+        getCamelContext().stop();
+
+        for (MainListener listener : listeners) {
+            listener.afterStop(this);
+        }
+
+        LOG.info("Main stopped");
+    }
+
+    @Override
+    protected ProducerTemplate findOrCreateCamelTemplate() {
+        throw new UnsupportedOperationException("Should not happen");
+    }
+
+    @Override
+    protected CamelContext createCamelContext() {
+        throw new UnsupportedOperationException("Should not happen");
+    }
+
+    public ProducerTemplate getProducerTemplate() {
+        if (this.producerTemplate == null) {
+            synchronized (this) {
+                if (this.producerTemplate == null) {
+                    this.producerTemplate = getCamelContext().createProducerTemplate();
+                }
+            }
+        }
+
+        return this.producerTemplate;
+    }
+
+    public ConsumerTemplate getConsumerTemplate() {
+        if (this.consumerTemplate == null) {
+            synchronized (this) {
+                if (this.consumerTemplate == null) {
+                    this.consumerTemplate = getCamelContext().createConsumerTemplate();
+                }
+            }
+        }
+
+        return this.consumerTemplate;
+    }
+}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
index 5a5f1fe..6c17ecc 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
@@ -20,27 +20,17 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Endpoint;
-import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.catalog.RuntimeCamelCatalog;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.main.BaseMainSupport;
-import org.apache.camel.main.Main;
-import org.apache.camel.main.MainListener;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.support.PropertyBindingSupport;
-import org.apache.camel.util.OrderedProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,43 +38,20 @@ public class CamelMainSupport {
     public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat.";
     private static final Logger LOG = LoggerFactory.getLogger(CamelMainSupport.class);
 
-    private Main camelMain;
-    private CamelContext camel;
+    private final CamelKafkaConnectMain camelMain;
 
-    private final ExecutorService exService = Executors.newSingleThreadExecutor();
-    private final CountDownLatch startFinishedSignal = new CountDownLatch(1);
-
-    public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, List<CamelKafkaConnectDataformat> dataformats, int aggregationSize, long aggregationTimeout) throws Exception {
-        this(props, fromUrl, toUrl, dataformats, aggregationSize, aggregationTimeout, new DefaultCamelContext());
-    }
-
-    public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, List<CamelKafkaConnectDataformat> dataformats, int aggregationSize, long aggregationTimeout, CamelContext camelContext) throws Exception {
-        camel = camelContext;
-        camelMain = new Main() {
-            @Override
-            protected ProducerTemplate findOrCreateCamelTemplate() {
-                return camel.createProducerTemplate();
-            }
-
-            @Override
-            protected CamelContext createCamelContext() {
-                return camel;
-            }
-        };
-
-        camelMain.addMainListener(new CamelMainFinishedListener());
+    public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, List<CamelKafkaConnectDataformat> dataformats, int aggregationSize, long aggregationTimeout, CamelContext camelContext) {
+        camelMain = new CamelKafkaConnectMain(camelContext);
         camelMain.configure().setAutoConfigurationLogSummary(false);
 
-
-        Properties camelProperties = new OrderedProperties();
+        Properties camelProperties = new Properties();
         camelProperties.putAll(props);
 
         LOG.info("Setting initial properties in Camel context: [{}]", camelProperties);
-        this.camel.getPropertiesComponent().setInitialProperties(camelProperties);
+        camelMain.setInitialProperties(camelProperties);
 
-        camelMain.init();
         //creating the actual route
-        this.camel.addRoutes(new RouteBuilder() {
+        camelMain.configure().addRoutesBuilder(new RouteBuilder() {
             public void configure() {
                 //from
                 RouteDefinition rd = from(fromUrl);
@@ -96,12 +63,12 @@ public class CamelMainSupport {
                     switch (dataformat.getDataformatKind()) {
                         case MARSHALL:
                             LOG.info(".marshal().custom({})", dataformatId);
-                            camel.getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId));
+                            getContext().getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId));
                             rd.marshal().custom(dataformatId);
                             break;
                         case UNMARSHALL:
                             LOG.info(".unmarshal().custom({})", dataformatId);
-                            camel.getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId));
+                            getContext().getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId));
                             rd.unmarshal().custom(dataformatId);
                             break;
                         default:
@@ -109,10 +76,9 @@ public class CamelMainSupport {
                     }
                 }
 
-
-                if (camel.getRegistry().lookupByName("aggregate") != null) {
+                if (getContext().getRegistry().lookupByName("aggregate") != null) {
                     //aggregation
-                    AggregationStrategy s = (AggregationStrategy) camel.getRegistry().lookupByName("aggregate");
+                    AggregationStrategy s = (AggregationStrategy) getContext().getRegistry().lookupByName("aggregate");
                     LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout);
                     LOG.info(".to({})", toUrl);
                     rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(toUrl);
@@ -125,16 +91,14 @@ public class CamelMainSupport {
         });
     }
 
-    public void start() throws Exception {
+    public void start() {
         LOG.info("Starting CamelContext");
 
-        CamelContextStarter starter = new CamelContextStarter();
-        exService.execute(starter);
-        startFinishedSignal.await();
-
-        if (starter.hasException()) {
-            LOG.info("CamelContext failed to start", starter.getException());
-            throw starter.getException();
+        try {
+            camelMain.start();
+        } catch (Exception e) {
+            LOG.info("CamelContext failed to start", e);
+            throw e;
         }
 
         LOG.info("CamelContext started");
@@ -143,46 +107,46 @@ public class CamelMainSupport {
     public void stop() {
         LOG.info("Stopping CamelContext");
 
-        camelMain.stop();
-        exService.shutdown();
+        try {
+            camelMain.stop();
+        } catch (Exception e) {
+            LOG.info("CamelContext failed to stop", e);
+            throw e;
+        }
 
         LOG.info("CamelContext stopped");
     }
 
     public ProducerTemplate createProducerTemplate() {
-        return camel.createProducerTemplate();
+        return camelMain.getProducerTemplate();
     }
 
     public Endpoint getEndpoint(String uri) {
-        return camel.getEndpoint(uri);
+        return camelMain.getCamelContext().getEndpoint(uri);
     }
 
     public Collection<Endpoint> getEndpoints() {
-        return camel.getEndpoints();
+        return camelMain.getCamelContext().getEndpoints();
     }
 
     public ConsumerTemplate createConsumerTemplate() {
-        return camel.createConsumerTemplate();
-    }
-
-    public RuntimeCamelCatalog getRuntimeCamelCatalog() {
-        return camel.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog();
+        return camelMain.getConsumerTemplate();
     }
 
     private DataFormat lookupAndInstantiateDataformat(String dataformatName) {
-        DataFormat df = camel.resolveDataFormat(dataformatName);
+        DataFormat df = camelMain.getCamelContext().resolveDataFormat(dataformatName);
 
         if (df == null) {
-            df = camel.createDataFormat(dataformatName);
+            df = camelMain.getCamelContext().createDataFormat(dataformatName);
 
             final String prefix = CAMEL_DATAFORMAT_PROPERTIES_PREFIX + dataformatName + ".";
-            final Properties props = camel.getPropertiesComponent().loadProperties(k -> k.startsWith(prefix));
+            final Properties props = camelMain.getCamelContext().getPropertiesComponent().loadProperties(k -> k.startsWith(prefix));
 
-            CamelContextAware.trySetCamelContext(df, camel);
+            CamelContextAware.trySetCamelContext(df, camelMain.getCamelContext());
 
             if (!props.isEmpty()) {
                 PropertyBindingSupport.build()
-                        .withCamelContext(camel)
+                        .withCamelContext(camelMain.getCamelContext())
                         .withOptionPrefix(prefix)
                         .withRemoveParameters(false)
                         .withProperties((Map) props)
@@ -198,71 +162,4 @@ public class CamelMainSupport {
         return df;
     }
 
-    private class CamelMainFinishedListener implements MainListener {
-        @Override
-        public void configure(CamelContext context) {
-
-        }
-
-        @Override
-        public void beforeStart(BaseMainSupport main) {
-
-        }
-
-        @Override
-        public void afterStart(BaseMainSupport main) {
-            LOG.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to CamelMainFinishedListener been called");
-            startFinishedSignal.countDown();
-        }
-
-        @Override
-        public void beforeStop(BaseMainSupport main) {
-
-        }
-
-        @Override
-        public void afterStop(BaseMainSupport main) {
-
-        }
-
-        @Override
-        public void beforeConfigure(BaseMainSupport main) {
-        }
-
-        @Override
-        public void afterConfigure(BaseMainSupport main) {
-
-        }
-
-        @Override
-        public void beforeInitialize(BaseMainSupport main) {
-
-        }
-    }
-
-    private class CamelContextStarter implements Runnable {
-        private Exception startException;
-
-        @Override
-        public void run() {
-            try {
-                camelMain.run();
-            } catch (Exception e) {
-                LOG.error("An exception has occurred before CamelContext startup has finished", e);
-                startException = e;
-                if (startFinishedSignal.getCount() > 0) {
-                    LOG.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to an exception");
-                    startFinishedSignal.countDown();
-                }
-            }
-        }
-
-        public boolean hasException() {
-            return startException != null;
-        }
-
-        public Exception getException() {
-            return startException;
-        }
-    }
 }