You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ra...@apache.org on 2015/12/23 16:56:04 UTC
[2/2] camel git commit: CAMEL-9445 camel-ignite: Adjust endpoint
metadata and add logs to consumers.
CAMEL-9445 camel-ignite: Adjust endpoint metadata and add logs to consumers.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/99c55941
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/99c55941
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/99c55941
Branch: refs/heads/camel-ignite
Commit: 99c559419847d1d635fbe4c6af909aea121996f2
Parents: d26af3d
Author: Raul Kripalani <ra...@apache.org>
Authored: Wed Dec 23 15:55:55 2015 +0000
Committer: Raul Kripalani <ra...@apache.org>
Committed: Wed Dec 23 15:55:55 2015 +0000
----------------------------------------------------------------------
.../ignite/cache/IgniteCacheContinuousQueryConsumer.java | 10 ++++++++++
.../component/ignite/compute/IgniteComputeEndpoint.java | 2 +-
.../component/ignite/events/IgniteEventsConsumer.java | 9 +++++++++
.../component/ignite/events/IgniteEventsEndpoint.java | 2 +-
.../ignite/messaging/IgniteMessagingConsumer.java | 10 ++++++++++
.../ignite/messaging/IgniteMessagingEndpoint.java | 6 +++++-
.../ignite/messaging/IgniteMessagingProducer.java | 3 +++
.../ignite/messaging/IgniteMessagingSendMode.java | 3 +++
8 files changed, 42 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
index e4c7302..cb06424 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.ignite.cache;
+import java.util.Arrays;
+
import javax.cache.Cache.Entry;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
@@ -59,6 +61,8 @@ public class IgniteCacheContinuousQueryConsumer extends DefaultConsumer {
launchContinuousQuery();
+ LOG.info("Started Ignite Cache Continuous Query consumer for cache {} with query: {}.", cache.getName(), endpoint.getQuery());
+
maybeFireExistingQueryResults();
}
@@ -96,6 +100,10 @@ public class IgniteCacheContinuousQueryConsumer extends DefaultConsumer {
continuousQuery.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
@Override
public void onUpdated(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> events) throws CacheEntryListenerException {
+ if (LOG.isTraceEnabled()) {
+ LOG.info("Processing Continuous Query event(s): {}.", events);
+ }
+
if (!endpoint.isOneExchangePerUpdate()) {
fireGroupedExchange(events);
return;
@@ -119,6 +127,8 @@ public class IgniteCacheContinuousQueryConsumer extends DefaultConsumer {
super.doStop();
cursor.close();
+
+ LOG.info("Stopped Ignite Cache Continuous Query consumer for cache {} with query: {}.", cache.getName(), endpoint.getQuery());
}
private void fireSingleExchange(CacheEntryEvent<? extends Object, ? extends Object> entry) {
http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java
index 1dc0663..d6a3eb2 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java
@@ -33,7 +33,7 @@ import org.apache.ignite.IgniteCompute;
/**
* Ignite Compute endpoint.
*/
-@UriEndpoint(scheme = "ignite:compute", title = "Ignite Compute", syntax = "ignite:compute:endpointId", label = "nosql,cache,compute", producerOnly = true)
+@UriEndpoint(scheme = "ignite:compute", title = "Ignite Compute", syntax = "ignite:compute:[endpointId]", label = "nosql,cache,compute", producerOnly = true)
public class IgniteComputeEndpoint extends AbstractIgniteEndpoint {
@UriParam
http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
index 5d63611..7df33ac 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.ignite.events;
+import java.util.Arrays;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
@@ -48,6 +50,9 @@ public class IgniteEventsConsumer extends DefaultConsumer {
Message in = exchange.getIn();
in.setBody(event);
try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing Ignite Event: {}.", event);
+ }
getAsyncProcessor().process(exchange, new AsyncCallback() {
@Override
public void done(boolean doneSync) {
@@ -80,6 +85,8 @@ public class IgniteEventsConsumer extends DefaultConsumer {
}
events.localListen(predicate, eventTypes);
+
+ LOG.info("Started local Ignite Events consumer for events: {}.", Arrays.asList(eventTypes));
}
@Override
@@ -87,6 +94,8 @@ public class IgniteEventsConsumer extends DefaultConsumer {
super.doStop();
events.stopLocalListen(predicate, eventTypes);
+
+ LOG.info("Stopped local Ignite Events consumer for events: {}.", Arrays.asList(eventTypes));
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java
index 6237ad8..1d48a30 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
/**
* Ignite Events endpoint. Only supports consumers.
*/
-@UriEndpoint(scheme = "ignite:events", title = "Ignite Events", syntax = "ignite:events:endpointId", label = "nosql,cache,compute,messaging,data",
+@UriEndpoint(scheme = "ignite:events", title = "Ignite Events", syntax = "ignite:events:[endpointId]", label = "nosql,cache,compute,messaging,data",
consumerOnly = true, consumerClass = IgniteEventsConsumer.class)
public class IgniteEventsEndpoint extends AbstractIgniteEndpoint {
http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java
index 6579437..03a8c67 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java
@@ -29,6 +29,9 @@ import org.apache.ignite.lang.IgniteBiPredicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Ignite Messaging consumer.
+ */
public class IgniteMessagingConsumer extends DefaultConsumer {
private static final Logger LOG = LoggerFactory.getLogger(IgniteMessagingConsumer.class);
@@ -47,6 +50,9 @@ public class IgniteMessagingConsumer extends DefaultConsumer {
in.setHeader(IgniteConstants.IGNITE_MESSAGING_TOPIC, endpoint.getTopic());
in.setHeader(IgniteConstants.IGNITE_MESSAGING_UUID, uuid);
try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing Ignite message for subscription {} with payload {}.", uuid, payload);
+ }
getProcessor().process(exchange);
} catch (Exception e) {
LOG.error(String.format("Exception while processing Ignite Message from topic %s", endpoint.getTopic()), e);
@@ -66,6 +72,8 @@ public class IgniteMessagingConsumer extends DefaultConsumer {
super.doStart();
messaging.localListen(endpoint.getTopic(), predicate);
+
+ LOG.info("Started Ignite Messaging consumer for topic {}.", endpoint.getTopic());
}
@Override
@@ -73,6 +81,8 @@ public class IgniteMessagingConsumer extends DefaultConsumer {
super.doStop();
messaging.stopLocalListen(endpoint.getTopic(), predicate);
+
+ LOG.info("Stopped Ignite Messaging consumer for topic {}.", endpoint.getTopic());
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java
index 2277a11..ca375f2 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingEndpoint.java
@@ -31,7 +31,11 @@ import org.apache.camel.spi.UriParam;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteMessaging;
-@UriEndpoint(scheme = "ignite:messaging", title = "Ignite Messaging", syntax = "ignite:messaging:[topic]", label = "nosql,cache,messaging")
+/**
+ * Ignite Messaging endpoint.
+ */
+@UriEndpoint(scheme = "ignite:messaging", title = "Ignite Messaging", syntax = "ignite:messaging:[topic]", label = "nosql,cache,messaging",
+ consumerClass = IgniteMessagingConsumer.class)
public class IgniteMessagingEndpoint extends AbstractIgniteEndpoint {
@UriParam
http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java
index c4946b8..5a33c43 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingProducer.java
@@ -28,6 +28,9 @@ import org.apache.camel.util.MessageHelper;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteMessaging;
+/**
+ * Ignite Messaging producer.
+ */
public class IgniteMessagingProducer extends DefaultAsyncProducer {
private IgniteMessagingEndpoint endpoint;
http://git-wip-us.apache.org/repos/asf/camel/blob/99c55941/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java
----------------------------------------------------------------------
diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java
index 0bf472c..a1c8900 100644
--- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java
+++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingSendMode.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.ignite.messaging;
+/**
+ * Enum for Ignite Messaging send modes.
+ */
public enum IgniteMessagingSendMode {
ORDERED, UNORDERED