You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ch...@apache.org on 2011/02/22 09:56:49 UTC
svn commit: r1073259 - in
/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message:
processors/ processors/dlc/ store/
Author: charith
Date: Tue Feb 22 08:56:49 2011
New Revision: 1073259
URL: http://svn.apache.org/viewvc?rev=1073259&view=rev
Log:
adding Message store observer
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreObserver.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java?rev=1073259&r1=1073258&r2=1073259&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java Tue Feb 22 08:56:49 2011
@@ -21,6 +21,7 @@ package org.apache.synapse.message.proce
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseException;
+import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.core.SynapseEnvironment;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
@@ -52,11 +53,6 @@ public abstract class ScheduledMessagePr
}
/**
- * message store parameters
- */
- protected Map<String, Object> parameters = null;
-
- /**
* The quartz configuration file if specified as a parameter
*/
protected String quartzConfig = null;
@@ -90,7 +86,8 @@ public abstract class ScheduledMessagePr
JobDetail jobDetail = getJobDetail();
JobDataMap jobDataMap = new JobDataMap();
- jobDataMap.put(MessageProcessorConsents.MESSAGE_STORE, messageStore);
+ jobDataMap.put(MessageProcessorConsents.MESSAGE_STORE,
+ configuration.getMessageStore(messageStore));
jobDataMap.put(MessageProcessorConsents.PARAMETERS, parameters);
jobDetail.setJobDataMap(jobDataMap);
@@ -169,6 +166,8 @@ public abstract class ScheduledMessagePr
scheduler.start();
state = State.INITIALIZED;
+
+ this.start();
} catch (SchedulerException e) {
throw new SynapseException("Error starting the scheduler", e);
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java?rev=1073259&r1=1073258&r2=1073259&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java Tue Feb 22 08:56:49 2011
@@ -24,6 +24,7 @@ import org.apache.synapse.FaultHandler;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseArtifact;
+import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.mediators.MediatorFaultHandler;
import org.apache.synapse.message.processors.MessageProcessorConsents;
@@ -51,7 +52,6 @@ public class RedeliveryJob implements Jo
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
-
messageStore = (MessageStore) jdm.get(MessageProcessorConsents.MESSAGE_STORE);
lock = ((AbstractMessageStore) messageStore).getLock();
Map<String, Object> parameters = (Map<String, Object>) jdm.get(
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java?rev=1073259&r1=1073258&r2=1073259&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java Tue Feb 22 08:56:49 2011
@@ -23,6 +23,8 @@ import org.apache.synapse.commons.jmx.MB
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.core.SynapseEnvironment;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -69,6 +71,11 @@ public abstract class AbstractMessageSto
*/
protected String fileName;
+ /**
+ * List that holds the MessageStore observers registered with the Message Store
+ */
+ protected List<MessageStoreObserver> messageStoreObservers =
+ new ArrayList<MessageStoreObserver>();
protected Lock lock = new ReentrantLock();
@@ -91,6 +98,37 @@ public abstract class AbstractMessageSto
}
+ public void registerObserver(MessageStoreObserver observer) {
+ if(observer != null && !messageStoreObservers.contains(observer)) {
+ messageStoreObservers.add(observer);
+ }
+ }
+
+ public void unregisterObserver(MessageStoreObserver observer) {
+ if(observer != null && messageStoreObservers.contains(observer)) {
+ messageStoreObservers.remove(observer);
+ }
+ }
+
+ /**
+ * Notify Message Addition to the observers
+ * @param messageId of the Message added.
+ */
+ protected void notifyMessageAddition(String messageId) {
+ for(MessageStoreObserver o : messageStoreObservers) {
+ o.messageAdded(messageId);
+ }
+ }
+
+ /**
+ * Notify Message removal to the observers
+ * @param messageId of the Message added
+ */
+ protected void notifyMessageRemoval(String messageId) {
+ for(MessageStoreObserver o : messageStoreObservers) {
+ o.messageRemoved(messageId);
+ }
+ }
public int size() {
return -1;
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java?rev=1073259&r1=1073258&r2=1073259&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java Tue Feb 22 08:56:49 2011
@@ -45,7 +45,8 @@ public class InMemoryMessageStore extend
try {
if (messageContext != null) {
messageList.put(messageContext.getMessageID(), messageContext);
-
+ /** Notify observers */
+ notifyMessageAddition(messageContext.getMessageID());
if (log.isDebugEnabled()) {
log.debug("Message with id " + messageContext.getMessageID() +
" stored");
@@ -65,6 +66,8 @@ public class InMemoryMessageStore extend
context = peek();
if(context !=null) {
messageList.remove(context.getMessageID());
+ /** Notify observers */
+ notifyMessageRemoval(context.getMessageID());
}
} finally {
lock.unlock();
@@ -101,7 +104,11 @@ public class InMemoryMessageStore extend
lock.lock();
try {
if (messageID != null) {
- return messageList.remove(messageID);
+ if(messageList.remove(messageID) != null) {
+ /** Notify observers */
+ notifyMessageRemoval(messageID);
+ }
+
}
} finally {
lock.unlock();
@@ -115,6 +122,8 @@ public class InMemoryMessageStore extend
for (String k : messageList.keySet()) {
messageList.remove(k);
+ /** Notify observers */
+ notifyMessageRemoval(k);
}
} finally {
lock.unlock();
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java?rev=1073259&r1=1073258&r2=1073259&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java Tue Feb 22 08:56:49 2011
@@ -142,6 +142,16 @@ public interface MessageStore extends Sy
/**
- * Todo Add observer api
+ * Register a MessageStore observer instance with the MessageStore
+ * to receive events.
+ * @param observer instance to be registered
*/
+ public void registerObserver(MessageStoreObserver observer);
+
+ /**
+ * Un register an Message store instance from the message store
+ * to stop receiving events
+ * @param observer instance to be unregistered
+ */
+ public void unregisterObserver(MessageStoreObserver observer);
}
\ No newline at end of file
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreObserver.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreObserver.java?rev=1073259&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreObserver.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreObserver.java Tue Feb 22 08:56:49 2011
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.message.store;
+
+
+/**
+ * An implementation of this interface can be registered with a Message store instance to receive
+ * Message store update update events. When ever a message is added/removed events defined in this
+ * interface will be fired
+ */
+public interface MessageStoreObserver {
+
+ /**
+ * Method invoked when a message is added to the store
+ * @param messageId of the message that was added to the Message store
+ */
+ public void messageAdded(String messageId);
+
+ /**
+ * Method invoked when a message is removed from the store
+ * @param messageId of the Message that was removed
+ */
+ public void messageRemoved(String messageId);
+
+
+}