You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by io...@apache.org on 2011/08/27 13:58:58 UTC
svn commit: r1162327 - in
/karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast:
HazelcastEventTransportFactory.java QueueConsumeTask.java
QueueConsumer.java QueueProducer.java
Author: iocanel
Date: Sat Aug 27 11:58:58 2011
New Revision: 1162327
URL: http://svn.apache.org/viewvc?rev=1162327&view=rev
Log:
[KARAF-823] Added HazelcastEventTransportFactory, QueueConsumer, QueueProducer and QueueConsumeTask.
Added:
karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastEventTransportFactory.java
karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumeTask.java
karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueProducer.java
Added: karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastEventTransportFactory.java
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastEventTransportFactory.java?rev=1162327&view=auto
==============================================================================
--- karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastEventTransportFactory.java (added)
+++ karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastEventTransportFactory.java Sat Aug 27 11:58:58 2011
@@ -0,0 +1,116 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.karaf.cellar.hazelcast;
+
+import com.hazelcast.core.IQueue;
+import com.hazelcast.core.ITopic;
+import org.apache.karaf.cellar.core.Dispatcher;
+import org.apache.karaf.cellar.core.event.EventConsumer;
+import org.apache.karaf.cellar.core.event.EventProducer;
+import org.apache.karaf.cellar.core.event.EventTransportFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * @author: iocanel
+ */
+public class HazelcastEventTransportFactory extends HazelcastInstanceAware implements EventTransportFactory {
+
+ private Dispatcher dispatcher;
+
+ private Map<String, QueueConsumer> queueConsumerMap = new HashMap<String, QueueConsumer>();
+ private Map<String, QueueProducer> queueProducerMap = new HashMap<String, QueueProducer>();
+
+ private Map<String, TopicConsumer> topicConsumerMap = new HashMap<String, TopicConsumer>();
+ private Map<String, TopicProducer> topicProducerMap = new HashMap<String, TopicProducer>();
+
+
+ @Override
+ public EventProducer getEventProducer(String name, Boolean pubsub) {
+ if (pubsub) {
+ TopicProducer producer = topicProducerMap.get(name);
+ if (producer != null) {
+ return producer;
+ } else {
+ ITopic topic = instance.getTopic(Constants.TOPIC + Constants.SEPARATOR + name);
+ producer = new TopicProducer();
+ producer.setTopic(topic);
+ producer.setNode(getNode());
+ producer.init();
+ topicProducerMap.put(name, producer);
+ return producer;
+ }
+ } else {
+ QueueProducer producer = queueProducerMap.get(name);
+ if (producer != null) {
+ return producer;
+ } else {
+ IQueue queue = instance.getQueue(Constants.QUEUE + Constants.SEPARATOR + name);
+ producer = new QueueProducer();
+ producer.setQueue(queue);
+ producer.setNode(getNode());
+ producer.init();
+ queueProducerMap.put(name, producer);
+ return producer;
+ }
+ }
+ }
+
+ @Override
+ public EventConsumer getEventConsumer(String name, Boolean pubsub) {
+ if (pubsub) {
+ TopicConsumer consumer = topicConsumerMap.get(name);
+ if (consumer != null) {
+ return consumer;
+ } else {
+
+ ITopic topic = instance.getTopic(Constants.TOPIC + Constants.SEPARATOR + name);
+ consumer = new TopicConsumer();
+ consumer.setTopic(topic);
+ consumer.setNode(getNode());
+ consumer.setDispatcher(dispatcher);
+ consumer.init();
+ topicConsumerMap.put(name, consumer);
+ return consumer;
+ }
+ } else {
+ QueueConsumer consumer = queueConsumerMap.get(name);
+ if (consumer != null) {
+ return consumer;
+ } else {
+ IQueue queue = instance.getQueue(Constants.QUEUE + Constants.SEPARATOR + name);
+ consumer = new QueueConsumer();
+ consumer.setQueue(queue);
+ consumer.setNode(getNode());
+ consumer.setDispatcher(dispatcher);
+ consumer.init();
+ consumer.start();
+ queueConsumerMap.put(name, consumer);
+ return consumer;
+ }
+ }
+ }
+
+ public Dispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public void setDispatcher(Dispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+}
Added: karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumeTask.java
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumeTask.java?rev=1162327&view=auto
==============================================================================
--- karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumeTask.java (added)
+++ karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumeTask.java Sat Aug 27 11:58:58 2011
@@ -0,0 +1,84 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.karaf.cellar.hazelcast;
+
+import org.apache.karaf.cellar.core.event.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author: iocanel
+ */
+public class QueueConsumeTask<E extends Event> implements Runnable {
+
+ private static final transient Logger LOGGER = LoggerFactory.getLogger(QueueConsumeTask.class);
+
+ private QueueConsumer<E> consumer;
+ private Boolean keepConsuming = Boolean.TRUE;
+
+ /**
+ * Constructor
+ *
+ * @param consumer
+ */
+ public QueueConsumeTask(QueueConsumer<E> consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void run() {
+ ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ while (keepConsuming) {
+ E e = null;
+ try {
+ if (consumer != null) {
+ e = consumer.getQueue().poll(10, TimeUnit.SECONDS);
+ }
+ } catch (InterruptedException e1) {
+ LOGGER.warn("Consume task interrupted");
+ }
+ if (e != null) {
+ consumer.consume(e);
+ }
+ }
+ } catch (Exception ex) {
+ LOGGER.error("Error while consuming from queue",ex);
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(originalClassLoader);
+ }
+ }
+
+ public void activate() {
+ keepConsuming = Boolean.TRUE;
+ }
+
+ public void deactivate() {
+ keepConsuming = Boolean.FALSE;
+ }
+
+ public QueueConsumer<E> getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(QueueConsumer<E> consumer) {
+ this.consumer = consumer;
+ }
+
+}
Added: karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java?rev=1162327&view=auto
==============================================================================
--- karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java (added)
+++ karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java Sat Aug 27 11:58:58 2011
@@ -0,0 +1,150 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.cellar.hazelcast;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IQueue;
+import com.hazelcast.core.ITopic;
+import com.hazelcast.core.ItemListener;
+import com.hazelcast.core.MessageListener;
+import org.apache.karaf.cellar.core.Dispatcher;
+import org.apache.karaf.cellar.core.Node;
+import org.apache.karaf.cellar.core.control.BasicSwitch;
+import org.apache.karaf.cellar.core.control.Switch;
+import org.apache.karaf.cellar.core.control.SwitchStatus;
+import org.apache.karaf.cellar.core.event.Event;
+import org.apache.karaf.cellar.core.event.EventConsumer;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * Consumes messages from the distributed {@code ITopic} and calls the {@code EventDispatcher}.
+ */
+public class QueueConsumer<E extends Event> implements EventConsumer<E>, ItemListener<E> {
+
+ public static final String SWITCH_ID = "org.apache.karaf.cellar.queue.consumer";
+
+ private final Switch eventSwitch = new BasicSwitch(SWITCH_ID);
+
+ private HazelcastInstance instance;
+ private IQueue queue;
+ private Dispatcher dispatcher;
+ private Node node;
+
+ private QueueConsumeTask queueConsumeTask = new QueueConsumeTask(this);
+ private ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+
+ /**
+ * Initialization method.
+ */
+ public void init() {
+ if (queue != null) {
+ queue.addItemListener(this,true);
+ } else {
+ queue = instance.getQueue(Constants.QUEUE);
+ queue.addItemListener(this,true);
+ }
+
+ }
+
+ /**
+ * Destruction method.
+ */
+ public void destroy() {
+ if (queue != null) {
+ queue.removeItemListener(this);
+ }
+ }
+
+ /**
+ * Consumes an event form the topic.
+ *
+ * @param event
+ */
+ public void consume(E event) {
+ if (event != null && (eventSwitch.getStatus().equals(SwitchStatus.ON) || event.getForce())) {
+ dispatcher.dispatch(event);
+ }
+ }
+
+ @Override
+ public void start()
+ {
+ queueConsumeTask.activate();
+ executorService.execute(queueConsumeTask);
+ }
+
+
+ @Override
+ public void stop() {
+ queueConsumeTask.deactivate();
+ }
+
+ @Override
+ public void itemAdded(E event) {
+
+ }
+
+ @Override
+ public void itemRemoved(E event) {
+
+ }
+
+ public QueueConsumeTask getQueueConsumeTask() {
+ return queueConsumeTask;
+ }
+
+ public void setQueueConsumeTask(QueueConsumeTask queueConsumeTask) {
+ this.queueConsumeTask = queueConsumeTask;
+ }
+
+ public Dispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public void setDispatcher(Dispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ public HazelcastInstance getInstance() {
+ return instance;
+ }
+
+ public void setInstance(HazelcastInstance instance) {
+ this.instance = instance;
+ }
+
+ public IQueue<E> getQueue() {
+ return queue;
+ }
+
+ public void setQueue(IQueue<E> queue) {
+ this.queue = queue;
+ }
+
+ public Switch getSwitch() {
+ return eventSwitch;
+ }
+
+ public Node getNode() {
+ return node;
+ }
+
+ public void setNode(Node node) {
+ this.node = node;
+ }
+}
Added: karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueProducer.java
URL: http://svn.apache.org/viewvc/karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueProducer.java?rev=1162327&view=auto
==============================================================================
--- karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueProducer.java (added)
+++ karaf/cellar/trunk/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueProducer.java Sat Aug 27 11:58:58 2011
@@ -0,0 +1,117 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.cellar.hazelcast;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IQueue;
+import org.apache.karaf.cellar.core.Node;
+import org.apache.karaf.cellar.core.command.Result;
+import org.apache.karaf.cellar.core.control.BasicSwitch;
+import org.apache.karaf.cellar.core.control.Switch;
+import org.apache.karaf.cellar.core.control.SwitchStatus;
+import org.apache.karaf.cellar.core.event.Event;
+import org.apache.karaf.cellar.core.event.EventProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Produces {@code Event}s into the distributed {@code ITopic}.
+ */
+public class QueueProducer<E extends Event> implements EventProducer<E> {
+
+ private static final transient Logger LOGGER = LoggerFactory.getLogger(QueueProducer.class);
+
+ public static final String SWITCH_ID = "org.apache.karaf.cellar.queue.producer";
+
+ private final Switch eventSwitch = new BasicSwitch(SWITCH_ID);
+
+ private HazelcastInstance instance;
+ private IQueue<E> queue;
+ private Node node;
+
+ /**
+ * Initialization method.
+ */
+ public void init() {
+ if (queue == null) {
+ queue = instance.getQueue(Constants.QUEUE);
+ }
+ }
+
+ /**
+ * Destruction method.
+ */
+ public void destroy() {
+ }
+
+ /**
+ * Propagates an event into the distributed {@code ITopic}.
+ *
+ * @param event
+ */
+ public void produce(E event) {
+ if (eventSwitch.getStatus().equals(SwitchStatus.ON) || event.getForce() || event instanceof Result) {
+ event.setSourceNode(node);
+ try {
+ queue.put(event);
+ } catch (InterruptedException e) {
+ LOGGER.error("Queue producer interrupted",e);
+ }
+ }
+ }
+
+ public Switch getSwitch() {
+ return eventSwitch;
+ }
+
+ public IQueue<E> getQueue() {
+ return queue;
+ }
+
+ public void setQueue(IQueue<E> queue) {
+ this.queue = queue;
+ }
+
+ public HazelcastInstance getInstance() {
+ return instance;
+ }
+
+ public void setInstance(HazelcastInstance instance) {
+ this.instance = instance;
+ }
+
+
+ public Node getNode() {
+ return node;
+ }
+
+ public void setNode(Node node) {
+ this.node = node;
+ }
+
+}