You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ra...@apache.org on 2015/12/23 16:56:04 UTC

[2/2] camel git commit: CAMEL-9445 camel-ignite: Adjust endpoint metadata and add logs to consumers.

CAMEL-9445 camel-ignite: Adjust endpoint metadata and add logs to consumers.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/99c55941
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/99c55941
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/99c55941

Branch: refs/heads/camel-ignite
Commit: 99c559419847d1d635fbe4c6af909aea121996f2
Parents: d26af3d
Author: Raul Kripalani <ra...@apache.org>
Authored: Wed Dec 23 15:55:55 2015 +0000
Committer: Raul Kripalani <ra...@apache.org>
Committed: Wed Dec 23 15:55:55 2015 +0000

----------------------------------------------------------------------
 .../ignite/cache/IgniteCacheContinuousQueryConsumer.java  | 10 ++++++++++
 .../component/ignite/compute/IgniteComputeEndpoint.java   |  2 +-
 .../component/ignite/events/IgniteEventsConsumer.java     |  9 +++++++++
 .../component/ignite/events/IgniteEventsEndpoint.java     |  2 +-
 .../ignite/messaging/IgniteMessagingConsumer.java         | 10 ++++++++++
 .../ignite/messaging/IgniteMessagingEndpoint.java         |  6 +++++-
 .../ignite/messaging/IgniteMessagingProducer.java         |  3 +++
 .../ignite/messaging/IgniteMessagingSendMode.java         |  3 +++
 8 files changed, 42 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
index e4c7302..cb06424 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.ignite.cache;
 
+import java.util.Arrays;
+
 import javax.cache.Cache.Entry;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryListenerException;
@@ -59,6 +61,8 @@ public class IgniteCacheContinuousQueryConsumer extends DefaultConsumer {
 
         launchContinuousQuery();
 
+        LOG.info("Started Ignite Cache Continuous Query consumer for cache {} with query: {}.", cache.getName(), endpoint.getQuery());
+
         maybeFireExistingQueryResults();
     }
 
@@ -96,6 +100,10 @@ public class IgniteCacheContinuousQueryConsumer extends DefaultConsumer {
         continuousQuery.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
             @Override
             public void onUpdated(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> events) throws CacheEntryListenerException {
+                if (LOG.isTraceEnabled()) {
+                    LOG.info("Processing Continuous Query event(s): {}.", events);
+                }
+
                 if (!endpoint.isOneExchangePerUpdate()) {
                     fireGroupedExchange(events);
                     return;
@@ -119,6 +127,8 @@ public class IgniteCacheContinuousQueryConsumer extends DefaultConsumer {
         super.doStop();
 
         cursor.close();
+        
+        LOG.info("Stopped Ignite Cache Continuous Query consumer for cache {} with query: {}.", cache.getName(), endpoint.getQuery());
     }
 
     private void fireSingleExchange(CacheEntryEvent<? extends Object, ? extends Object> entry) {

http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java
index 1dc0663..d6a3eb2 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java
@@ -33,7 +33,7 @@ import org.apache.ignite.IgniteCompute;
 /**
  * Ignite Compute endpoint.
  */
-@UriEndpoint(scheme = "ignite:compute", title = "Ignite Compute", syntax = "ignite:compute:endpointId", label = "nosql,cache,compute", producerOnly = true)
+@UriEndpoint(scheme = "ignite:compute", title = "Ignite Compute", syntax = "ignite:compute:[endpointId]", label = "nosql,cache,compute", producerOnly = true)
 public class IgniteComputeEndpoint extends AbstractIgniteEndpoint {
 
     @UriParam

http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
index 5d63611..7df33ac 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.ignite.events;
 
+import java.util.Arrays;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -48,6 +50,9 @@ public class IgniteEventsConsumer extends DefaultConsumer {
             Message in = exchange.getIn();
             in.setBody(event);
             try {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing Ignite Event: {}.", event);
+                }
                 getAsyncProcessor().process(exchange, new AsyncCallback() {
                     @Override
                     public void done(boolean doneSync) {
@@ -80,6 +85,8 @@ public class IgniteEventsConsumer extends DefaultConsumer {
         }
 
         events.localListen(predicate, eventTypes);
+        
+        LOG.info("Started local Ignite Events consumer for events: {}.", Arrays.asList(eventTypes));
     }
 
     @Override
@@ -87,6 +94,8 @@ public class IgniteEventsConsumer extends DefaultConsumer {
         super.doStop();
 
         events.stopLocalListen(predicate, eventTypes);
+        
+        LOG.info("Stopped local Ignite Events consumer for events: {}.", Arrays.asList(eventTypes));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java
index 6237ad8..1d48a30 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Ignite Events endpoint. Only supports consumers.
  */
-@UriEndpoint(scheme = "ignite:events", title = "Ignite Events", syntax = "ignite:events:endpointId", label = "nosql,cache,compute,messaging,data", 
+@UriEndpoint(scheme = "ignite:events", title = "Ignite Events", syntax = "ignite:events:[endpointId]", label = "nosql,cache,compute,messaging,data", 
     consumerOnly = true, consumerClass = IgniteEventsConsumer.class)
 public class IgniteEventsEndpoint extends AbstractIgniteEndpoint {
 

http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java
index 6579437..03a8c67 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java
@@ -29,6 +29,9 @@ import org.apache.ignite.lang.IgniteBiPredicate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Ignite Messaging consumer.
+ */
 public class IgniteMessagingConsumer extends DefaultConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(IgniteMessagingConsumer.class);
@@ -47,6 +50,9 @@ public class IgniteMessagingConsumer extends DefaultConsumer {
             in.setHeader(IgniteConstants.IGNITE_MESSAGING_TOPIC, endpoint.getTopic());
             in.setHeader(IgniteConstants.IGNITE_MESSAGING_UUID, uuid);
             try {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing Ignite message for subscription {} with payload {}.", uuid, payload);
+                }
                 getProcessor().process(exchange);
             } catch (Exception e) {
                 LOG.error(String.format("Exception while processing Ignite Message from topic %s", endpoint.getTopic()), e);
@@ -66,6 +72,8 @@ public class IgniteMessagingConsumer extends DefaultConsumer {
         super.doStart();
 
         messaging.localListen(endpoint.getTopic(), predicate);
+        
+        LOG.info("Started Ignite Messaging consumer for topic {}.", endpoint.getTopic());
     }
 
     @Override
@@ -73,6 +81,8 @@ public class IgniteMessagingConsumer extends DefaultConsumer {
         super.doStop();
 
         messaging.stopLocalListen(endpoint.getTopic(), predicate);
+        
+        LOG.info("Stopped Ignite Messaging consumer for topic {}.", endpoint.getTopic());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java
index 2277a11..ca375f2 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java
@@ -31,7 +31,11 @@ import org.apache.camel.spi.UriParam;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteMessaging;
 
-@UriEndpoint(scheme = "ignite:messaging", title = "Ignite Messaging", syntax = "ignite:messaging:[topic]", label = "nosql,cache,messaging")
+/**
+ * Ignite Messaging endpoint.
+ */
+@UriEndpoint(scheme = "ignite:messaging", title = "Ignite Messaging", syntax = "ignite:messaging:[topic]", label = "nosql,cache,messaging", 
+    consumerClass = IgniteMessagingConsumer.class)
 public class IgniteMessagingEndpoint extends AbstractIgniteEndpoint {
 
     @UriParam

http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java
index c4946b8..5a33c43 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java
@@ -28,6 +28,9 @@ import org.apache.camel.util.MessageHelper;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteMessaging;
 
+/**
+ * Ignite Messaging producer.
+ */
 public class IgniteMessagingProducer extends DefaultAsyncProducer {
 
     private IgniteMessagingEndpoint endpoint;

http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java
----------------------------------------------------------------------
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java
index 0bf472c..a1c8900 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.ignite.messaging;
 
+/**
+ * Enum for Ignite Messaging send modes.
+ */
 public enum IgniteMessagingSendMode {
 
     ORDERED, UNORDERED