You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/10/07 10:07:41 UTC
[camel-kafka-connector] branch master updated: [camel-main-support]
Use BaseMainSupport instead of Main #499
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 75ccafa [camel-main-support] Use BaseMainSupport instead of Main #499
75ccafa is described below
commit 75ccafa823b4215bf2836b2586940073649537c9
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Oct 7 11:18:00 2020 +0200
[camel-main-support] Use BaseMainSupport instead of Main #499
---
.../utils/CamelKafkaConnectMain.java | 122 +++++++++++++++
.../kafkaconnector/utils/CamelMainSupport.java | 165 ++++-----------------
2 files changed, 153 insertions(+), 134 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
new file mode 100644
index 0000000..46b0822
--- /dev/null
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.utils;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ConsumerTemplate;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.main.BaseMainSupport;
+import org.apache.camel.main.MainListener;
+import org.apache.camel.support.service.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CamelKafkaConnectMain extends BaseMainSupport {
+ public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat.";
+ private static final Logger LOG = LoggerFactory.getLogger(CamelKafkaConnectMain.class);
+
+ protected volatile ConsumerTemplate consumerTemplate;
+ protected volatile ProducerTemplate producerTemplate;
+
+ public CamelKafkaConnectMain(CamelContext context) {
+ this.camelContext = context;
+ }
+
+ @Override
+ protected void doInit() throws Exception {
+ super.doInit();
+ postProcessCamelContext(camelContext);
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ LOG.info("Starting Main");
+
+ for (MainListener listener : listeners) {
+ listener.beforeStart(this);
+ }
+
+ super.doStart();
+
+ getCamelContext().start();
+
+ for (MainListener listener : listeners) {
+ listener.afterStart(this);
+ }
+
+ LOG.info("Main started");
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ LOG.info("Stopping Main");
+
+ ServiceHelper.stopService(consumerTemplate);
+ consumerTemplate = null;
+
+ ServiceHelper.stopService(producerTemplate);
+ producerTemplate = null;
+
+ for (MainListener listener : listeners) {
+ listener.beforeStop(this);
+ }
+
+ super.doStart();
+
+ getCamelContext().stop();
+
+ for (MainListener listener : listeners) {
+ listener.afterStop(this);
+ }
+
+ LOG.info("Main stopped");
+ }
+
+ @Override
+ protected ProducerTemplate findOrCreateCamelTemplate() {
+ throw new UnsupportedOperationException("Should not happen");
+ }
+
+ @Override
+ protected CamelContext createCamelContext() {
+ throw new UnsupportedOperationException("Should not happen");
+ }
+
+ public ProducerTemplate getProducerTemplate() {
+ if (this.producerTemplate == null) {
+ synchronized (this) {
+ if (this.producerTemplate == null) {
+ this.producerTemplate = getCamelContext().createProducerTemplate();
+ }
+ }
+ }
+
+ return this.producerTemplate;
+ }
+
+ public ConsumerTemplate getConsumerTemplate() {
+ if (this.consumerTemplate == null) {
+ synchronized (this) {
+ if (this.consumerTemplate == null) {
+ this.consumerTemplate = getCamelContext().createConsumerTemplate();
+ }
+ }
+ }
+
+ return this.consumerTemplate;
+ }
+}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
index 5a5f1fe..6c17ecc 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
@@ -20,27 +20,17 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Endpoint;
-import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.catalog.RuntimeCamelCatalog;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.main.BaseMainSupport;
-import org.apache.camel.main.Main;
-import org.apache.camel.main.MainListener;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.support.PropertyBindingSupport;
-import org.apache.camel.util.OrderedProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,43 +38,20 @@ public class CamelMainSupport {
public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat.";
private static final Logger LOG = LoggerFactory.getLogger(CamelMainSupport.class);
- private Main camelMain;
- private CamelContext camel;
+ private final CamelKafkaConnectMain camelMain;
- private final ExecutorService exService = Executors.newSingleThreadExecutor();
- private final CountDownLatch startFinishedSignal = new CountDownLatch(1);
-
- public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, List<CamelKafkaConnectDataformat> dataformats, int aggregationSize, long aggregationTimeout) throws Exception {
- this(props, fromUrl, toUrl, dataformats, aggregationSize, aggregationTimeout, new DefaultCamelContext());
- }
-
- public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, List<CamelKafkaConnectDataformat> dataformats, int aggregationSize, long aggregationTimeout, CamelContext camelContext) throws Exception {
- camel = camelContext;
- camelMain = new Main() {
- @Override
- protected ProducerTemplate findOrCreateCamelTemplate() {
- return camel.createProducerTemplate();
- }
-
- @Override
- protected CamelContext createCamelContext() {
- return camel;
- }
- };
-
- camelMain.addMainListener(new CamelMainFinishedListener());
+ public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, List<CamelKafkaConnectDataformat> dataformats, int aggregationSize, long aggregationTimeout, CamelContext camelContext) {
+ camelMain = new CamelKafkaConnectMain(camelContext);
camelMain.configure().setAutoConfigurationLogSummary(false);
-
- Properties camelProperties = new OrderedProperties();
+ Properties camelProperties = new Properties();
camelProperties.putAll(props);
LOG.info("Setting initial properties in Camel context: [{}]", camelProperties);
- this.camel.getPropertiesComponent().setInitialProperties(camelProperties);
+ camelMain.setInitialProperties(camelProperties);
- camelMain.init();
//creating the actual route
- this.camel.addRoutes(new RouteBuilder() {
+ camelMain.configure().addRoutesBuilder(new RouteBuilder() {
public void configure() {
//from
RouteDefinition rd = from(fromUrl);
@@ -96,12 +63,12 @@ public class CamelMainSupport {
switch (dataformat.getDataformatKind()) {
case MARSHALL:
LOG.info(".marshal().custom({})", dataformatId);
- camel.getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId));
+ getContext().getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId));
rd.marshal().custom(dataformatId);
break;
case UNMARSHALL:
LOG.info(".unmarshal().custom({})", dataformatId);
- camel.getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId));
+ getContext().getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId));
rd.unmarshal().custom(dataformatId);
break;
default:
@@ -109,10 +76,9 @@ public class CamelMainSupport {
}
}
-
- if (camel.getRegistry().lookupByName("aggregate") != null) {
+ if (getContext().getRegistry().lookupByName("aggregate") != null) {
//aggregation
- AggregationStrategy s = (AggregationStrategy) camel.getRegistry().lookupByName("aggregate");
+ AggregationStrategy s = (AggregationStrategy) getContext().getRegistry().lookupByName("aggregate");
LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout);
LOG.info(".to({})", toUrl);
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(toUrl);
@@ -125,16 +91,14 @@ public class CamelMainSupport {
});
}
- public void start() throws Exception {
+ public void start() {
LOG.info("Starting CamelContext");
- CamelContextStarter starter = new CamelContextStarter();
- exService.execute(starter);
- startFinishedSignal.await();
-
- if (starter.hasException()) {
- LOG.info("CamelContext failed to start", starter.getException());
- throw starter.getException();
+ try {
+ camelMain.start();
+ } catch (Exception e) {
+ LOG.info("CamelContext failed to start", e);
+ throw e;
}
LOG.info("CamelContext started");
@@ -143,46 +107,46 @@ public class CamelMainSupport {
public void stop() {
LOG.info("Stopping CamelContext");
- camelMain.stop();
- exService.shutdown();
+ try {
+ camelMain.stop();
+ } catch (Exception e) {
+ LOG.info("CamelContext failed to stop", e);
+ throw e;
+ }
LOG.info("CamelContext stopped");
}
public ProducerTemplate createProducerTemplate() {
- return camel.createProducerTemplate();
+ return camelMain.getProducerTemplate();
}
public Endpoint getEndpoint(String uri) {
- return camel.getEndpoint(uri);
+ return camelMain.getCamelContext().getEndpoint(uri);
}
public Collection<Endpoint> getEndpoints() {
- return camel.getEndpoints();
+ return camelMain.getCamelContext().getEndpoints();
}
public ConsumerTemplate createConsumerTemplate() {
- return camel.createConsumerTemplate();
- }
-
- public RuntimeCamelCatalog getRuntimeCamelCatalog() {
- return camel.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog();
+ return camelMain.getConsumerTemplate();
}
private DataFormat lookupAndInstantiateDataformat(String dataformatName) {
- DataFormat df = camel.resolveDataFormat(dataformatName);
+ DataFormat df = camelMain.getCamelContext().resolveDataFormat(dataformatName);
if (df == null) {
- df = camel.createDataFormat(dataformatName);
+ df = camelMain.getCamelContext().createDataFormat(dataformatName);
final String prefix = CAMEL_DATAFORMAT_PROPERTIES_PREFIX + dataformatName + ".";
- final Properties props = camel.getPropertiesComponent().loadProperties(k -> k.startsWith(prefix));
+ final Properties props = camelMain.getCamelContext().getPropertiesComponent().loadProperties(k -> k.startsWith(prefix));
- CamelContextAware.trySetCamelContext(df, camel);
+ CamelContextAware.trySetCamelContext(df, camelMain.getCamelContext());
if (!props.isEmpty()) {
PropertyBindingSupport.build()
- .withCamelContext(camel)
+ .withCamelContext(camelMain.getCamelContext())
.withOptionPrefix(prefix)
.withRemoveParameters(false)
.withProperties((Map) props)
@@ -198,71 +162,4 @@ public class CamelMainSupport {
return df;
}
- private class CamelMainFinishedListener implements MainListener {
- @Override
- public void configure(CamelContext context) {
-
- }
-
- @Override
- public void beforeStart(BaseMainSupport main) {
-
- }
-
- @Override
- public void afterStart(BaseMainSupport main) {
- LOG.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to CamelMainFinishedListener been called");
- startFinishedSignal.countDown();
- }
-
- @Override
- public void beforeStop(BaseMainSupport main) {
-
- }
-
- @Override
- public void afterStop(BaseMainSupport main) {
-
- }
-
- @Override
- public void beforeConfigure(BaseMainSupport main) {
- }
-
- @Override
- public void afterConfigure(BaseMainSupport main) {
-
- }
-
- @Override
- public void beforeInitialize(BaseMainSupport main) {
-
- }
- }
-
- private class CamelContextStarter implements Runnable {
- private Exception startException;
-
- @Override
- public void run() {
- try {
- camelMain.run();
- } catch (Exception e) {
- LOG.error("An exception has occurred before CamelContext startup has finished", e);
- startException = e;
- if (startFinishedSignal.getCount() > 0) {
- LOG.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to an exception");
- startFinishedSignal.countDown();
- }
- }
- }
-
- public boolean hasException() {
- return startException != null;
- }
-
- public Exception getException() {
- return startException;
- }
- }
}