You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/21 05:16:45 UTC

[camel] 05/05: CAMEL-13870: Fast property configuration of Camel endpoints. Work in progress.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch CAMEL-13870
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9d969b95a6c2eb5cbb99f1cd8fffd50d75ce9fd6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Aug 21 07:16:22 2019 +0200

    CAMEL-13870: Fast property configuration of Camel endpoints. Work in progress.
---
 .../component/iec60870/AbstractIecEndpoint.java    |  2 +
 .../src/main/docs/ignite-events-component.adoc     |  2 +-
 .../ignite/events/IgniteEventsConsumer.java        | 12 ++--
 .../ignite/events/IgniteEventsEndpoint.java        | 66 +++++++++-------------
 .../camel/component/ignite/IgniteEventsTest.java   | 29 ----------
 .../camel/component/jcache/JCacheEndpoint.java     | 12 ++--
 .../apache/camel/component/jira/JiraEndpoint.java  |  4 ++
 .../apache/camel/component/jmx/JMXConsumer.java    |  4 +-
 .../apache/camel/component/jmx/JMXEndpoint.java    |  4 +-
 .../apache/camel/component/jpa/JpaEndpoint.java    |  4 +-
 .../camel/component/jt400/Jt400Endpoint.java       |  4 ++
 .../camel/component/kafka/KafkaConfiguration.java  |  8 ++-
 .../camel/component/kafka/KafkaConsumer.java       |  2 +-
 .../kubernetes/AbstractKubernetesEndpoint.java     |  4 ++
 14 files changed, 70 insertions(+), 87 deletions(-)

diff --git a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/AbstractIecEndpoint.java b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/AbstractIecEndpoint.java
index e5521e3..364953e 100644
--- a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/AbstractIecEndpoint.java
+++ b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/AbstractIecEndpoint.java
@@ -78,6 +78,8 @@ public abstract class AbstractIecEndpoint<T extends AbstractConnectionMultiplexo
         this.address = requireNonNull(address);
     }
 
+    
+
     public ObjectAddress getAddress() {
         return this.address;
     }
diff --git a/components/camel-ignite/src/main/docs/ignite-events-component.adoc b/components/camel-ignite/src/main/docs/ignite-events-component.adoc
index d9ac803..748d3e8 100644
--- a/components/camel-ignite/src/main/docs/ignite-events-component.adoc
+++ b/components/camel-ignite/src/main/docs/ignite-events-component.adoc
@@ -52,7 +52,7 @@ with the following path and query parameters:
 | Name | Description | Default | Type
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
 | *clusterGroupExpression* (consumer) | The cluster group expression. |  | ClusterGroupExpression
-| *events* (consumer) | The event IDs to subscribe to as a Set directly where the IDs are the different constants in org.apache.ignite.events.EventType. | EventType.EVTS_ALL | SetOrString
+| *events* (consumer) | The event types to subscribe to as a comma-separated string of event constants as defined in EventType. For example: EVT_CACHE_ENTRY_CREATED,EVT_CACHE_OBJECT_REMOVED,EVT_IGFS_DIR_CREATED. | EVTS_ALL | String
 | *propagateIncomingBodyIfNo ReturnValue* (consumer) | Sets whether to propagate the incoming body if the return type of the underlying Ignite operation is void. | true | boolean
 | *treatCollectionsAsCache Objects* (consumer) | Sets whether to treat Collections as cache objects or as Collections of items to insert/update/compute, etc. | false | boolean
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
index 9f448a6..d0177a8 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.ignite.events;
 
 import java.util.Arrays;
+import java.util.List;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
@@ -72,12 +73,11 @@ public class IgniteEventsConsumer extends DefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
 
-        if (endpoint.getEvents() != null && endpoint.getEvents().size() > 0) {
-            eventTypes = new int[endpoint.getEvents().size()];
-            int counter = 0;
-            for (Integer i : endpoint.getEvents()) {
-                eventTypes[counter++] = i;
-            }
+        List<Integer> ids = endpoint.getEventsAsIds();
+        eventTypes = new int[ids.size()];
+        int counter = 0;
+        for (Integer i : ids) {
+            eventTypes[counter++] = i;
         }
 
         events.localListen(predicate, eventTypes);
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java
index fb9f760..5d76301 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java
@@ -17,8 +17,10 @@
 package org.apache.camel.component.ignite.events;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -49,20 +51,14 @@ public class IgniteEventsEndpoint extends AbstractIgniteEndpoint {
     @UriPath
     private String endpointId;
 
-    @UriParam(label = "consumer", javaType = "Set<Integer> or String", defaultValue = "EventType.EVTS_ALL")
-    private Set<Integer> events;
+    @UriParam(label = "consumer", defaultValue = "EVTS_ALL")
+    private String events = "EVTS_ALL";
 
     @UriParam(label = "consumer")
     private ClusterGroupExpression clusterGroupExpression;
 
     public IgniteEventsEndpoint(String uri, String remaining, Map<String, Object> parameters, IgniteEventsComponent igniteComponent) {
         super(uri, igniteComponent);
-
-        // Initialize subscribed event types with ALL.
-        events = new HashSet<>();
-        for (Integer eventType : EventType.EVTS_ALL) {
-            events.add(eventType);
-        }
     }
 
     @Override
@@ -116,50 +112,46 @@ public class IgniteEventsEndpoint extends AbstractIgniteEndpoint {
 
     /**
      * Gets the event types to subscribe to.
-     * 
-     * @return
      */
-    public Set<Integer> getEvents() {
+    public String getEvents() {
         return events;
     }
 
     /**
-     * The event IDs to subscribe to as a Set<Integer> directly where
-     * the IDs are the different constants in org.apache.ignite.events.EventType.
-     * 
-     * @param events
-     */
-    public void setEvents(Set<Integer> events) {
-        this.events = events;
-    }
-
-    /**
      * The event types to subscribe to as a comma-separated string of event constants as defined in {@link EventType}.
-     * <p>
      * For example: EVT_CACHE_ENTRY_CREATED,EVT_CACHE_OBJECT_REMOVED,EVT_IGFS_DIR_CREATED.
-     * 
-     * @param events
      */
     public void setEvents(String events) {
-        this.events = new HashSet<>();
-        Set<String> requestedEvents = new HashSet<>(Arrays.asList(events.toUpperCase().split(",")));
-        Field[] fields = EventType.class.getDeclaredFields();
-        for (Field field : fields) {
-            if (!requestedEvents.contains(field.getName())) {
-                continue;
+        this.events = events;
+    }
+
+    public List<Integer> getEventsAsIds() {
+        List<Integer> answer = new ArrayList<>();
+
+        if (events.equals("EVTS_ALL")) {
+            for (Integer eventType : EventType.EVTS_ALL) {
+                answer.add(eventType);
             }
-            try {
-                this.events.add(field.getInt(null));
-            } catch (Exception e) {
-                throw new IllegalArgumentException("Problem while resolving event type. See stacktrace.", e);
+        } else {
+            Set<String> requestedEvents = new HashSet<>(Arrays.asList(events.toUpperCase().split(",")));
+            Field[] fields = EventType.class.getDeclaredFields();
+            for (Field field : fields) {
+                if (!requestedEvents.contains(field.getName())) {
+                    continue;
+                }
+                try {
+                    answer.add(field.getInt(null));
+                } catch (Exception e) {
+                    throw new IllegalArgumentException("Problem while resolving event type. See stacktrace.", e);
+                }
             }
         }
+
+        return answer;
     }
 
     /**
      * Gets the cluster group expression.
-     * 
-     * @return cluster group expression
      */
     public ClusterGroupExpression getClusterGroupExpression() {
         return clusterGroupExpression;
@@ -167,8 +159,6 @@ public class IgniteEventsEndpoint extends AbstractIgniteEndpoint {
 
     /**
      * The cluster group expression.
-     * 
-     * @param clusterGroupExpression cluster group expression
      */
     public void setClusterGroupExpression(ClusterGroupExpression clusterGroupExpression) {
         this.clusterGroupExpression = clusterGroupExpression;
diff --git a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteEventsTest.java b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteEventsTest.java
index 12be923..d2239f7 100644
--- a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteEventsTest.java
+++ b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteEventsTest.java
@@ -86,35 +86,6 @@ public class IgniteEventsTest extends AbstractIgniteTest {
     }
 
     @Test
-    public void testConsumeFilteredEventsWithRef() throws Exception {
-        context.getRegistry().bind("filter", Sets.newHashSet(EventType.EVT_CACHE_OBJECT_PUT));
-
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("ignite-events:" + resourceUid + "?events=#filter").to("mock:test2");
-            }
-        });
-
-        getMockEndpoint("mock:test2").expectedMessageCount(2);
-
-        IgniteCache<String, String> cache = ignite().getOrCreateCache(resourceUid);
-
-        // Generate cache activity.
-        cache.put(resourceUid, "123");
-        cache.get(resourceUid);
-        cache.remove(resourceUid);
-        cache.get(resourceUid);
-        cache.put(resourceUid, "123");
-
-        assertMockEndpointsSatisfied();
-
-        List<Integer> eventTypes = receivedEventTypes("mock:test2");
-
-        assert_().that(eventTypes).containsExactly(EventType.EVT_CACHE_OBJECT_PUT, EventType.EVT_CACHE_OBJECT_PUT).inOrder();
-    }
-
-    @Test
     public void testConsumeFilteredEventsInline() throws Exception {
         context.addRoutes(new RouteBuilder() {
             @Override
diff --git a/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheEndpoint.java b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheEndpoint.java
index f6a3225..83d5fa9 100644
--- a/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheEndpoint.java
+++ b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheEndpoint.java
@@ -35,7 +35,7 @@ public class JCacheEndpoint extends DefaultEndpoint {
     @Metadata(required = true)
     private final String cacheName;
     @UriParam
-    private final JCacheConfiguration cacheConfiguration;
+    private final JCacheConfiguration configuration;
 
     private volatile JCacheManager<Object, Object> cacheManager;
 
@@ -43,12 +43,16 @@ public class JCacheEndpoint extends DefaultEndpoint {
         super(uri, component);
 
         this.cacheName = configuration.getCacheName();
-        this.cacheConfiguration = configuration;
+        this.configuration = configuration;
+    }
+
+    public JCacheConfiguration getConfiguration() {
+        return configuration;
     }
 
     @Override
     public Producer createProducer() throws Exception {
-        return new JCacheProducer(this, cacheConfiguration);
+        return new JCacheProducer(this, configuration);
     }
 
     @Override
@@ -58,7 +62,7 @@ public class JCacheEndpoint extends DefaultEndpoint {
 
     @Override
     protected void doStart() throws Exception {
-        cacheManager = JCacheHelper.createManager(cacheConfiguration);
+        cacheManager = JCacheHelper.createManager(configuration);
     }
 
     @Override
diff --git a/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraEndpoint.java b/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraEndpoint.java
index b9b468f..9c3baf8 100644
--- a/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraEndpoint.java
+++ b/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraEndpoint.java
@@ -86,6 +86,10 @@ public class JiraEndpoint extends DefaultEndpoint {
         this.configuration = configuration;
     }
 
+    public JiraConfiguration getConfiguration() {
+        return configuration;
+    }
+
     @Override
     public void doStart() throws Exception {
         super.doStart();
diff --git a/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumer.java b/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumer.java
index 7c5b37e..1d3a398 100644
--- a/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumer.java
+++ b/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumer.java
@@ -115,7 +115,7 @@ public class JMXConsumer extends DefaultConsumer implements NotificationListener
             try {
                 initNetworkConnection();
             } catch (IOException e) {
-                if (!mJmxEndpoint.getTestConnectionOnStartup()) {
+                if (!mJmxEndpoint.isTestConnectionOnStartup()) {
                     log.warn("Failed to connect to JMX server. >> {}", e.getMessage());
                     scheduleDelayedStart();
                     return;
@@ -197,7 +197,7 @@ public class JMXConsumer extends DefaultConsumer implements NotificationListener
                         || connectionNotification.getType().equals(JMXConnectionNotification.CLOSED) 
                         || connectionNotification.getType().equals(JMXConnectionNotification.FAILED)) {
                 log.warn("Lost JMX connection for : {}", URISupport.sanitizeUri(mJmxEndpoint.getEndpointUri()));
-                if (mJmxEndpoint.getReconnectOnConnectionFailure()) {
+                if (mJmxEndpoint.isReconnectOnConnectionFailure()) {
                     scheduleReconnect();
                 } else {
                     log.warn("The JMX consumer will not be reconnected. Use 'reconnectOnConnectionFailure' to "
diff --git a/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXEndpoint.java b/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXEndpoint.java
index 0e7af35..961d7e7 100644
--- a/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXEndpoint.java
+++ b/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXEndpoint.java
@@ -507,7 +507,7 @@ public class JMXEndpoint extends DefaultEndpoint {
         stringToCompare = aStringToCompare;
     }
     
-    public boolean getTestConnectionOnStartup() {
+    public boolean isTestConnectionOnStartup() {
         return this.testConnectionOnStartup;
     }
     
@@ -515,7 +515,7 @@ public class JMXEndpoint extends DefaultEndpoint {
         this.testConnectionOnStartup = testConnectionOnStartup;
     }
     
-    public boolean getReconnectOnConnectionFailure() {
+    public boolean isReconnectOnConnectionFailure() {
         return this.reconnectOnConnectionFailure;
     }
     
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
index 70ef706..777a0ab 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
@@ -135,7 +135,7 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
         producer.setParameters(getParameters());
         producer.setResultClass(getResultClass());
         producer.setFindEntity(isFindEntity());
-        producer.setUseExecuteUpdate(isUseExecuteUpdate());
+        producer.setUseExecuteUpdate(getUseExecuteUpdate());
         return producer;
     }
 
@@ -499,7 +499,7 @@ public class JpaEndpoint extends ScheduledPollEndpoint {
         this.preDeleteHandler = preDeleteHandler;
     }
 
-    public Boolean isUseExecuteUpdate() {
+    public Boolean getUseExecuteUpdate() {
         return useExecuteUpdate;
     }
 
diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
index 420274f..4b5d99a 100644
--- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
+++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
@@ -70,6 +70,10 @@ public class Jt400Endpoint extends ScheduledPollEndpoint implements MultipleCons
         }
     }
 
+    public Jt400Configuration getConfiguration() {
+        return configuration;
+    }
+
     @Override
     public Producer createProducer() throws Exception {
         if (Jt400Type.DTAQ == configuration.getType()) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 4c9d0ce..71564fe 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -409,7 +409,7 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
         addPropertyIfNotNull(props, ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, getInterceptorClasses());
         addPropertyIfNotNull(props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getAutoOffsetReset());
         addPropertyIfNotNull(props, ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs());
-        addPropertyIfNotNull(props, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, isAutoCommitEnable());
+        addPropertyIfNotNull(props, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, getAutoCommitEnable());
         addPropertyIfNotNull(props, ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, getPartitionAssignor());
         addPropertyIfNotNull(props, ConsumerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes());
         addPropertyIfNotNull(props, ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, getConsumerRequestTimeoutMs());
@@ -645,10 +645,14 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
         this.clientId = clientId;
     }
 
-    public Boolean isAutoCommitEnable() {
+    public boolean isAutoCommitEnable() {
         return offsetRepository == null ? autoCommitEnable : false;
     }
 
+    public Boolean getAutoCommitEnable() {
+        return autoCommitEnable;
+    }
+
     /**
      * If true, periodically commit to ZooKeeper the offset of messages already
      * fetched by the consumer. This committed offset will be used when the
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index e644a10..aa3bb76 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -490,7 +490,7 @@ public class KafkaConsumer extends DefaultConsumer {
     }
 
     private boolean isAutoCommitEnabled() {
-        return endpoint.getConfiguration().isAutoCommitEnable() != null && endpoint.getConfiguration().isAutoCommitEnable();
+        return endpoint.getConfiguration().getAutoCommitEnable() != null && endpoint.getConfiguration().getAutoCommitEnable();
     }
 
     protected String serializeOffsetKey(TopicPartition topicPartition) {
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
index 10d2d3c..9904f7e 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
@@ -38,6 +38,10 @@ public abstract class AbstractKubernetesEndpoint extends DefaultEndpoint {
         this.configuration = config;
     }
 
+    public KubernetesConfiguration getConfiguration() {
+        return configuration;
+    }
+
     @Override
     public boolean isSingleton() {
         return false;