You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by io...@apache.org on 2011/08/27 13:58:58 UTC

svn commit: r1162327 - in /karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast: HazelcastEventTransportFactory.java QueueConsumeTask.java QueueConsumer.java QueueProducer.java

Author: iocanel
Date: Sat Aug 27 11:58:58 2011
New Revision: 1162327

URL: http://svn.apache.org/viewvc?rev=1162327&view=rev
Log:
[KARAF-823] Added HazelcastEventTransportFactory, QueueConsumer, QueueProducer and QueueConsumeTask.

Added:
    karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastEventTransportFactory.java
    karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumeTask.java
    karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
    karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueProducer.java

Added: karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastEventTransportFactory.java
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastEventTransportFactory.java?rev=1162327&view=auto
==============================================================================
--- karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastEventTransportFactory.java (added)
+++ karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastEventTransportFactory.java Sat Aug 27 11:58:58 2011
@@ -0,0 +1,116 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.karaf.cellar.hazelcast;
+
+import com.hazelcast.core.IQueue;
+import com.hazelcast.core.ITopic;
+import org.apache.karaf.cellar.core.Dispatcher;
+import org.apache.karaf.cellar.core.event.EventConsumer;
+import org.apache.karaf.cellar.core.event.EventProducer;
+import org.apache.karaf.cellar.core.event.EventTransportFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * @author: iocanel
+ */
+public class HazelcastEventTransportFactory extends HazelcastInstanceAware implements EventTransportFactory {
+
+    private Dispatcher dispatcher;
+
+    private Map<String, QueueConsumer> queueConsumerMap = new HashMap<String, QueueConsumer>();
+    private Map<String, QueueProducer> queueProducerMap = new HashMap<String, QueueProducer>();
+
+    private Map<String, TopicConsumer> topicConsumerMap = new HashMap<String, TopicConsumer>();
+    private Map<String, TopicProducer> topicProducerMap = new HashMap<String, TopicProducer>();
+
+
+    @Override
+    public EventProducer getEventProducer(String name, Boolean pubsub) {
+        if (pubsub) {
+            TopicProducer producer = topicProducerMap.get(name);
+            if (producer != null) {
+                return producer;
+            } else {
+                ITopic topic = instance.getTopic(Constants.TOPIC + Constants.SEPARATOR + name);
+                producer = new TopicProducer();
+                producer.setTopic(topic);
+                producer.setNode(getNode());
+                producer.init();
+                topicProducerMap.put(name, producer);
+                return producer;
+            }
+        } else {
+            QueueProducer producer = queueProducerMap.get(name);
+            if (producer != null) {
+                return producer;
+            } else {
+                IQueue queue = instance.getQueue(Constants.QUEUE + Constants.SEPARATOR + name);
+                producer = new QueueProducer();
+                producer.setQueue(queue);
+                producer.setNode(getNode());
+                producer.init();
+                queueProducerMap.put(name, producer);
+                return producer;
+            }
+        }
+    }
+
+    @Override
+    public EventConsumer getEventConsumer(String name, Boolean pubsub) {
+        if (pubsub) {
+            TopicConsumer consumer = topicConsumerMap.get(name);
+            if (consumer != null) {
+                return consumer;
+            } else {
+
+                ITopic topic = instance.getTopic(Constants.TOPIC + Constants.SEPARATOR + name);
+                consumer = new TopicConsumer();
+                consumer.setTopic(topic);
+                consumer.setNode(getNode());
+                consumer.setDispatcher(dispatcher);
+                consumer.init();
+                topicConsumerMap.put(name, consumer);
+                return consumer;
+            }
+        } else {
+            QueueConsumer consumer = queueConsumerMap.get(name);
+            if (consumer != null) {
+                return consumer;
+            } else {
+                IQueue queue = instance.getQueue(Constants.QUEUE + Constants.SEPARATOR + name);
+                consumer = new QueueConsumer();
+                consumer.setQueue(queue);
+                consumer.setNode(getNode());
+                consumer.setDispatcher(dispatcher);
+                consumer.init();
+                consumer.start();
+                queueConsumerMap.put(name, consumer);
+                return consumer;
+            }
+        }
+    }
+
+    public Dispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setDispatcher(Dispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+}

Added: karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumeTask.java
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumeTask.java?rev=1162327&view=auto
==============================================================================
--- karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumeTask.java (added)
+++ karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumeTask.java Sat Aug 27 11:58:58 2011
@@ -0,0 +1,84 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.karaf.cellar.hazelcast;
+
+import org.apache.karaf.cellar.core.event.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author: iocanel
+ */
+public class QueueConsumeTask<E extends Event> implements Runnable {
+
+    private static final transient Logger LOGGER = LoggerFactory.getLogger(QueueConsumeTask.class);
+
+    private QueueConsumer<E> consumer;
+    private Boolean keepConsuming = Boolean.TRUE;
+
+    /**
+     * Constructor
+     *
+     * @param consumer
+     */
+    public QueueConsumeTask(QueueConsumer<E> consumer) {
+        this.consumer = consumer;
+    }
+
+    @Override
+    public void run() {
+        ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+            while (keepConsuming) {
+                E e = null;
+                try {
+                    if (consumer != null) {
+                        e = consumer.getQueue().poll(10, TimeUnit.SECONDS);
+                    }
+                } catch (InterruptedException e1) {
+                    LOGGER.warn("Consume task interrupted");
+                }
+                if (e != null) {
+                    consumer.consume(e);
+                }
+            }
+        } catch (Exception ex) {
+            LOGGER.error("Error while consuming from queue",ex);
+        }
+        finally {
+            Thread.currentThread().setContextClassLoader(originalClassLoader);
+        }
+    }
+
+    public void activate() {
+        keepConsuming = Boolean.TRUE;
+    }
+
+    public void deactivate() {
+        keepConsuming = Boolean.FALSE;
+    }
+
+    public QueueConsumer<E> getConsumer() {
+        return consumer;
+    }
+
+    public void setConsumer(QueueConsumer<E> consumer) {
+        this.consumer = consumer;
+    }
+
+}

Added: karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java?rev=1162327&view=auto
==============================================================================
--- karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java (added)
+++ karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java Sat Aug 27 11:58:58 2011
@@ -0,0 +1,150 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.cellar.hazelcast;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IQueue;
+import com.hazelcast.core.ITopic;
+import com.hazelcast.core.ItemListener;
+import com.hazelcast.core.MessageListener;
+import org.apache.karaf.cellar.core.Dispatcher;
+import org.apache.karaf.cellar.core.Node;
+import org.apache.karaf.cellar.core.control.BasicSwitch;
+import org.apache.karaf.cellar.core.control.Switch;
+import org.apache.karaf.cellar.core.control.SwitchStatus;
+import org.apache.karaf.cellar.core.event.Event;
+import org.apache.karaf.cellar.core.event.EventConsumer;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * Consumes messages from the distributed {@code ITopic} and calls the {@code EventDispatcher}.
+ */
+public class QueueConsumer<E extends Event> implements EventConsumer<E>, ItemListener<E> {
+
+    public static final String SWITCH_ID = "org.apache.karaf.cellar.queue.consumer";
+
+    private final Switch eventSwitch = new BasicSwitch(SWITCH_ID);
+
+    private HazelcastInstance instance;
+    private IQueue queue;
+    private Dispatcher dispatcher;
+    private Node node;
+
+    private QueueConsumeTask queueConsumeTask = new QueueConsumeTask(this);
+    private ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+
+    /**
+     * Initialization method.
+     */
+    public void init() {
+        if (queue != null) {
+            queue.addItemListener(this,true);
+        } else {
+            queue = instance.getQueue(Constants.QUEUE);
+            queue.addItemListener(this,true);
+        }
+
+    }
+
+    /**
+     * Destruction method.
+     */
+    public void destroy() {
+        if (queue != null) {
+            queue.removeItemListener(this);
+        }
+    }
+
+    /**
+     * Consumes an event form the topic.
+     *
+     * @param event
+     */
+    public void consume(E event) {
+        if (event != null  && (eventSwitch.getStatus().equals(SwitchStatus.ON) || event.getForce())) {
+                dispatcher.dispatch(event);
+        }
+    }
+
+    @Override
+    public void start()
+    {
+        queueConsumeTask.activate();
+        executorService.execute(queueConsumeTask);
+    }
+
+
+    @Override
+    public void stop() {
+      queueConsumeTask.deactivate();
+    }
+
+    @Override
+    public void itemAdded(E event) {
+
+    }
+
+    @Override
+    public void itemRemoved(E event) {
+
+    }
+
+    public QueueConsumeTask getQueueConsumeTask() {
+        return queueConsumeTask;
+    }
+
+    public void setQueueConsumeTask(QueueConsumeTask queueConsumeTask) {
+        this.queueConsumeTask = queueConsumeTask;
+    }
+
+    public Dispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setDispatcher(Dispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public HazelcastInstance getInstance() {
+        return instance;
+    }
+
+    public void setInstance(HazelcastInstance instance) {
+        this.instance = instance;
+    }
+
+    public IQueue<E> getQueue() {
+        return queue;
+    }
+
+    public void setQueue(IQueue<E> queue) {
+        this.queue = queue;
+    }
+
+    public Switch getSwitch() {
+        return eventSwitch;
+    }
+
+    public Node getNode() {
+        return node;
+    }
+
+    public void setNode(Node node) {
+        this.node = node;
+    }
+}

Added: karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueProducer.java
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueProducer.java?rev=1162327&view=auto
==============================================================================
--- karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueProducer.java (added)
+++ karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueProducer.java Sat Aug 27 11:58:58 2011
@@ -0,0 +1,117 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.cellar.hazelcast;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IQueue;
+import org.apache.karaf.cellar.core.Node;
+import org.apache.karaf.cellar.core.command.Result;
+import org.apache.karaf.cellar.core.control.BasicSwitch;
+import org.apache.karaf.cellar.core.control.Switch;
+import org.apache.karaf.cellar.core.control.SwitchStatus;
+import org.apache.karaf.cellar.core.event.Event;
+import org.apache.karaf.cellar.core.event.EventProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Produces {@code Event}s into the distributed {@code ITopic}.
+ */
+public class QueueProducer<E extends Event> implements EventProducer<E> {
+
+    private static final transient Logger LOGGER = LoggerFactory.getLogger(QueueProducer.class);
+
+    public static final String SWITCH_ID = "org.apache.karaf.cellar.queue.producer";
+
+    private final Switch eventSwitch = new BasicSwitch(SWITCH_ID);
+
+    private HazelcastInstance instance;
+    private IQueue<E> queue;
+    private Node node;
+
+    /**
+     * Initialization method.
+     */
+    public void init() {
+        if (queue == null) {
+            queue = instance.getQueue(Constants.QUEUE);
+        }
+    }
+
+    /**
+     * Destruction method.
+     */
+    public void destroy() {
+    }
+
+    /**
+     * Propagates an event into the distributed {@code ITopic}.
+     *
+     * @param event
+     */
+    public void produce(E event) {
+        if (eventSwitch.getStatus().equals(SwitchStatus.ON) || event.getForce() || event instanceof Result) {
+            event.setSourceNode(node);
+            try {
+                queue.put(event);
+            } catch (InterruptedException e) {
+                LOGGER.error("Queue producer interrupted",e);
+            }
+        }
+    }
+
+    public Switch getSwitch() {
+        return eventSwitch;
+    }
+
+    public IQueue<E> getQueue() {
+        return queue;
+    }
+
+    public void setQueue(IQueue<E> queue) {
+        this.queue = queue;
+    }
+
+    public HazelcastInstance getInstance() {
+        return instance;
+    }
+
+    public void setInstance(HazelcastInstance instance) {
+        this.instance = instance;
+    }
+
+
+    public Node getNode() {
+        return node;
+    }
+
+    public void setNode(Node node) {
+        this.node = node;
+    }
+
+}