You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2011/07/03 17:51:46 UTC
svn commit: r1142453 [8/12] - in
/incubator/airavata/ws-messaging/trunk/messagebroker: ./ .settings/
customLibs/ customLibs/activeMQ/ src/ src/main/ src/main/java/
src/main/java/org/ src/main/java/org/apache/
src/main/java/org/apache/airavata/ src/main...
Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/Axis2Protocol.java Sun Jul 3 15:51:36 2011
@@ -0,0 +1,155 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.protocol;
+
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.WsmgNameSpaceConstants;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.util.ElementHelper;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.SOAPHeaderBlock;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.client.ServiceClient;
+import org.apache.log4j.Logger;
+
+public class Axis2Protocol implements DeliveryProtocol {
+
+ Logger logger = Logger.getLogger(Axis2Protocol.class);
+
+ SOAPFactory soapfactory = OMAbstractFactory.getSOAP11Factory();
+
+ ServiceClient nonThreadLocalServiceClient = null;
+
+ long tcpConnectionTimeout = 0;
+
+ @Override
+ public void setTimeout(long timeout) {
+ this.tcpConnectionTimeout = timeout;
+ }
+
+ @Override
+ public void deliver(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent)
+ throws SendingException {
+ EndpointReference consumerReference = new EndpointReference(consumerInfo.getConsumerEprStr());
+
+ /*
+ * Extract information
+ */
+ String actionString = null;
+ List<OMElement> soapHeaders = new LinkedList<OMElement>();
+ if (consumerInfo.getType().compareTo("wsnt") == 0) {
+ actionString = WsmgNameSpaceConstants.WSNT_NS.getNamespaceURI() + "/Notify";
+ } else { // wse
+ actionString = additionalMessageContent.getAction();
+ String topicElString = additionalMessageContent.getTopicElement();
+ if (topicElString != null) {
+ OMElement topicEl = null;
+ try {
+ topicEl = CommonRoutines.reader2OMElement(new StringReader(topicElString));
+ soapHeaders.add(topicEl);
+ } catch (XMLStreamException e) {
+ logger.fatal("exception at topicEl xmlStreamException", e);
+ }
+ }
+ }
+
+ try {
+
+ ServiceClient client = configureServiceClient(actionString, consumerReference,
+ additionalMessageContent.getMessageID(), soapHeaders);
+
+ client.sendRobust(message);
+ client.cleanupTransport();
+
+ } catch (AxisFault ex) {
+ throw new SendingException(ex.getCause());
+ }
+ }
+
+ private ServiceClient getServiceClient() throws AxisFault {
+
+ ServiceClient ret = nonThreadLocalServiceClient;
+ if (ret == null) {
+ ret = new ServiceClient();
+
+ nonThreadLocalServiceClient = ret;
+ }
+ ret.removeHeaders();
+ return ret;
+ }
+
+ private ServiceClient configureServiceClient(String action, EndpointReference consumerLocation, String msgId,
+ List<OMElement> soapHeaders) throws AxisFault {
+
+ // not engaging addressing modules
+
+ ServiceClient client = getServiceClient();
+
+ SOAPHeaderBlock msgIdEl = soapfactory.createSOAPHeaderBlock("MessageID", WsmgNameSpaceConstants.WSA_NS);
+ msgIdEl.setText(msgId);
+ SOAPHeaderBlock actionEl = soapfactory.createSOAPHeaderBlock("Action", WsmgNameSpaceConstants.WSA_NS);
+ actionEl.setText(action);
+
+ SOAPHeaderBlock to = soapfactory.createSOAPHeaderBlock("To", WsmgNameSpaceConstants.WSA_NS);
+ to.setText(consumerLocation.getAddress());
+
+ client.addHeader(actionEl);
+ client.addHeader(msgIdEl);
+ client.addHeader(to);
+
+ for (OMElement omHeader : soapHeaders) {
+
+ try {
+ SOAPHeaderBlock headerBlock = ElementHelper.toSOAPHeaderBlock(omHeader, soapfactory);
+
+ client.addHeader(headerBlock);
+ } catch (Exception e) {
+ throw AxisFault.makeFault(e);
+ }
+
+ }
+
+ Options opts = new Options();
+ opts.setTimeOutInMilliSeconds(tcpConnectionTimeout);
+ opts.setMessageId(msgId);
+ opts.setTo(consumerLocation);
+ opts.setAction(action);
+ opts.setProperty(org.apache.axis2.transport.http.HTTPConstants.CHUNKED, Boolean.FALSE);
+
+ opts.setProperty(org.apache.axis2.transport.http.HTTPConstants.HTTP_PROTOCOL_VERSION,
+ org.apache.axis2.transport.http.HTTPConstants.HEADER_PROTOCOL_10);
+ client.setOptions(opts);
+
+ return client;
+ }
+}
Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java Sun Jul 3 15:51:36 2011
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.protocol;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.axiom.om.OMElement;
+
+public interface DeliveryProtocol {
+
+ public void deliver(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent)
+ throws SendingException;
+
+ public void setTimeout(long timeout);
+}
Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java Sun Jul 3 15:51:36 2011
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.protocol;
+
+import org.apache.axis2.AxisFault;
+
+public class SendingException extends AxisFault {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 6250791562500752579L;
+
+ public SendingException(Throwable cause) {
+ super(cause);
+ }
+
+}
Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java Sun Jul 3 15:51:36 2011
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy;
+
+public interface SendingStrategy {
+ public void start();
+
+ public void shutdown();
+}
Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java Sun Jul 3 15:51:36 2011
@@ -0,0 +1,241 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
+import org.apache.airavata.wsmg.messenger.SenderUtils;
+import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
+import org.apache.axiom.om.OMElement;
+import org.apache.log4j.Logger;
+
+public class FixedParallelSender extends Thread implements SendingStrategy {
+
+ private Logger log = Logger.getLogger(FixedParallelSender.class);
+
+ private ConcurrentHashMap<String, ConsumerHandler> activeConsumerHanders = new ConcurrentHashMap<String, ConsumerHandler>();
+
+ private ThreadCrew threadCrew = null;
+
+ private ConsumerUrlManager urlManager = null;
+ private ConfigurationManager configManager = null;
+
+ private long consumerHandlerIdCounter;
+
+ private boolean stopFlag = false;
+
+ public FixedParallelSender(ConfigurationManager config, ConsumerUrlManager urlMan) {
+
+ int poolSize = config.getConfig(WsmgCommonConstants.CONFIG_SENDING_THREAD_POOL_SIZE,
+ WsmgCommonConstants.DEFAULT_SENDING_THREAD_POOL_SIZE);
+
+ threadCrew = new ThreadCrew(poolSize);
+ urlManager = urlMan;
+ configManager = config;
+ }
+
+ public void shutdown() {
+ stopFlag = true;
+ }
+
+ public void run() {
+ int dequeuedMessageCounter = 0;
+
+ while (!stopFlag) {
+
+ try {
+
+ if (log.isDebugEnabled())
+ log.debug("before dequeue - delivery thread");
+
+ OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
+
+ if (WSMGParameter.showTrackId)
+ log.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
+ + ": dequeued from outgoing queue");
+
+ distributeOverConsumerQueues(outGoingMessage);
+
+ } catch (Exception e) {
+
+ log.fatal("Unexpected_exception:", e);
+ }
+
+ dequeuedMessageCounter++;
+ }
+
+ threadCrew.stop();
+
+ }
+
+ public void distributeOverConsumerQueues(OutGoingMessage message) {
+ List<ConsumerInfo> consumerInfoList = message.getConsumerInfoList();
+
+ for (ConsumerInfo consumer : consumerInfoList) {
+
+ sendToConsumerHandler(consumer, message);
+
+ }
+
+ }
+
+ private ConsumerHandler sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message) {
+
+ String consumerUrl = consumer.getConsumerEprStr();
+
+ LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(),
+ message.getAdditionalMessageContent());
+
+ ConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
+
+ if (handler == null) {
+ handler = new ConsumerHandler(getNextConsumerHandlerId(), consumerUrl, configManager, urlManager);
+ activeConsumerHanders.put(consumerUrl, handler);
+ handler.submitMessage(lwm); // import to submit before execute.
+ threadCrew.submitTask(handler);
+ // (to remove a possible race
+ // condition)
+ } else {
+ handler.submitMessage(lwm);
+ }
+
+ return handler;
+
+ }
+
+ private long getNextConsumerHandlerId() {
+ return ++consumerHandlerIdCounter;
+ }
+
+ class ConsumerHandler implements RunnableEx {
+
+ LinkedBlockingQueue<LightweightMsg> queue = new LinkedBlockingQueue<LightweightMsg>();
+
+ final long id;
+ int batchSize;
+
+ ThreadLocal<SenderUtils> threadlocalSender = new ThreadLocal<SenderUtils>();
+
+ // SenderUtils sender = null;
+ String consumerUrl;
+
+ ConfigurationManager configMan;
+ ConsumerUrlManager consumerURLManager;
+
+ public ConsumerHandler(long handlerId, String url, ConfigurationManager config, ConsumerUrlManager urlMan) {
+
+ configMan = config;
+ consumerURLManager = urlMan;
+ // sender = new SenderUtils(urlMan, config, true);
+ id = handlerId;
+ consumerUrl = url;
+
+ batchSize = config.getConfig(WsmgCommonConstants.CONFIG_SENDING_BATCH_SIZE,
+ WsmgCommonConstants.DEFAULT_SENDING_BATCH_SIZE);
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public String getConsumerUrl() {
+ return consumerUrl;
+ }
+
+ private SenderUtils getSender() {
+
+ SenderUtils s = threadlocalSender.get();
+
+ if (s == null) {
+ s = new SenderUtils(consumerURLManager, configMan);
+ threadlocalSender.set(s);
+ }
+
+ return s;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+
+ if (o instanceof ConsumerHandler) {
+ ConsumerHandler h = (ConsumerHandler) o;
+ return h.getId() == id && h.getConsumerUrl().equals(this.getConsumerUrl());
+ }
+
+ return false;
+ }
+
+ public void submitMessage(LightweightMsg msg) {
+ queue.add(msg);
+ }
+
+ public void run() {
+
+ if (log.isDebugEnabled())
+ log.debug(String.format("starting consumer handler: id :%d, url : %s", getId(), getConsumerUrl()));
+
+ LinkedList<LightweightMsg> localList = new LinkedList<LightweightMsg>();
+
+ queue.drainTo(localList, batchSize);
+
+ send(localList);
+ localList.clear();
+
+ if (log.isDebugEnabled())
+ log.debug(String.format("calling on completion from : %d,", getId()));
+
+ }
+
+ private void send(LinkedList<LightweightMsg> list) {
+
+ SenderUtils s = getSender();
+
+ while (!list.isEmpty()) {
+
+ LightweightMsg m = list.removeFirst();
+
+ try {
+ OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(m.getPayLoad()));
+
+ s.send(m.getConsumerInfo(), messgae2Send, m.getHeader());
+
+ } catch (Exception e) {
+ log.fatal(e);
+ }
+
+ }
+
+ }
+ }
+
+}
Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java Sun Jul 3 15:51:36 2011
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+
+class LightweightMsg {
+ private ConsumerInfo consumerInfo;
+ private String payload;
+ private AdditionalMessageContent header;
+
+ public LightweightMsg(ConsumerInfo c, String pld, AdditionalMessageContent h) {
+ consumerInfo = c;
+ payload = pld;
+ header = h;
+ }
+
+ public String getPayLoad() {
+ return payload;
+ }
+
+ public ConsumerInfo getConsumerInfo() {
+ return consumerInfo;
+ }
+
+ public AdditionalMessageContent getHeader() {
+ return header;
+ }
+
+ public String toString() {
+ return String.format("header: %s, consumer: %s, pld: %s", header, consumerInfo.getConsumerEprStr(), payload);
+ }
+
+}
Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java Sun Jul 3 15:51:36 2011
@@ -0,0 +1,300 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
+import org.apache.airavata.wsmg.messenger.SenderUtils;
+import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
+import org.apache.axiom.om.OMElement;
+import org.apache.log4j.Logger;
+
+public class ParallelSender extends Thread implements SendingStrategy {
+
+ private Logger log = Logger.getLogger(ParallelSender.class);
+
+ private ConcurrentHashMap<String, ConsumerHandler> activeConsumerHanders = new ConcurrentHashMap<String, ConsumerHandler>();
+
+ private final ExecutorService threadPool;
+ private long consumerHandlerIdCounter = 0L;
+ private boolean stopFlag = false;
+
+ private ConsumerUrlManager urlManager = null;
+ private ConfigurationManager configManager = null;
+
+ private ConsumerHandlerCompletionCallback consumerCallback = new ConsumerHandlerCompletionCallback() {
+
+ public void onCompletion(ConsumerHandler h) {
+
+ if (!activeConsumerHanders.remove(h.getConsumerUrl(), h)) {
+
+ if (log.isDebugEnabled())
+ log.debug(String.format("inactive consumer handler " + "is already removed: id %d, url : %s",
+ h.getId(), h.getConsumerUrl()));
+ }
+
+ }
+ };
+
+ public ParallelSender(ConfigurationManager config, ConsumerUrlManager urlMan) {
+ urlManager = urlMan;
+ configManager = config;
+
+ threadPool = Executors.newCachedThreadPool();
+
+ }
+
+ public void shutdown() {
+ stopFlag = true;
+ }
+
+ public void run() {
+ int dequeuedMessageCounter = 0;
+
+ while (!stopFlag) {
+
+ try {
+
+ if (log.isDebugEnabled())
+ log.debug("before dequeue - delivery thread");
+
+ OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
+
+ if (WSMGParameter.showTrackId)
+ log.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
+ + ": dequeued from outgoing queue");
+
+ distributeOverConsumerQueues(outGoingMessage);
+
+ } catch (Exception e) {
+
+ log.fatal("Unexpected_exception:", e);
+ }
+
+ dequeuedMessageCounter++;
+ }
+
+ threadPool.shutdown();
+
+ }
+
+ public void distributeOverConsumerQueues(OutGoingMessage message) {
+ List<ConsumerInfo> consumerInfoList = message.getConsumerInfoList();
+
+ for (ConsumerInfo consumer : consumerInfoList) {
+
+ sendToConsumerHandler(consumer, message);
+
+ }
+
+ }
+
+ private ConsumerHandler sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message) {
+
+ String consumerUrl = consumer.getConsumerEprStr();
+
+ LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(),
+ message.getAdditionalMessageContent());
+
+ ConsumerHandler handler = activeConsumerHanders.get(consumerUrl);
+
+ if (handler == null || (!handler.isActive())) {
+ handler = new ConsumerHandler(getNextConsumerHandlerId(), consumerUrl, consumerCallback, configManager,
+ urlManager);
+ activeConsumerHanders.put(consumerUrl, handler);
+ handler.submitMessage(lwm); // import to submit before execute.
+ // (to remove a possible race
+ // condition)
+ threadPool.execute(handler);
+ } else {
+ handler.submitMessage(lwm);
+ }
+
+ return handler;
+ }
+
+ private long getNextConsumerHandlerId() {
+ return ++consumerHandlerIdCounter;
+ }
+
+ interface ConsumerHandlerCompletionCallback {
+
+ public void onCompletion(ConsumerHandler h);
+
+ }
+
+ class ConsumerHandler implements Runnable {
+
+ LinkedBlockingQueue<LightweightMsg> queue = new LinkedBlockingQueue<LightweightMsg>();
+
+ ReadWriteLock activeLock = new ReentrantReadWriteLock();
+
+ final long id;
+ final int MAX_UNSUCCESSFULL_DRAINS = 3;
+ final int SLEEP_TIME_SECONDS = 1;
+ int numberOfUnsuccessfullDrainAttempts = 0;
+
+ boolean active = true;
+
+ ConsumerHandlerCompletionCallback callback = null;
+ SenderUtils sender = null;
+ String consumerUrl;
+
+ public ConsumerHandler(long handlerId, String url, ConsumerHandlerCompletionCallback c,
+ ConfigurationManager config, ConsumerUrlManager urlMan) {
+ callback = c;
+ sender = new SenderUtils(urlMan, config);
+ id = handlerId;
+ consumerUrl = url;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public String getConsumerUrl() {
+ return consumerUrl;
+ }
+
+ public boolean isActive() {
+
+ boolean ret = false;
+
+ activeLock.readLock().lock();
+ try {
+ ret = active;
+ } finally {
+ activeLock.readLock().unlock();
+ }
+
+ return ret;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+
+ if (o instanceof ConsumerHandler) {
+ ConsumerHandler h = (ConsumerHandler) o;
+ return h.getId() == id && h.getConsumerUrl().equals(this.getConsumerUrl());
+ }
+
+ return false;
+ }
+
+ public void submitMessage(LightweightMsg msg) {
+ queue.add(msg);
+ }
+
+ public void run() {
+
+ if (log.isDebugEnabled())
+ log.debug(String.format("starting consumer handler: id :%d, url : %s", getId(), getConsumerUrl()));
+
+ LinkedList<LightweightMsg> localList = new LinkedList<LightweightMsg>();
+
+ while (active) {
+
+ int drainedMsgs = 0;
+ try {
+ activeLock.writeLock().lock();
+
+ drainedMsgs = queue.drainTo(localList);
+
+ if (drainedMsgs <= 0) {
+ numberOfUnsuccessfullDrainAttempts++;
+ } else {
+ numberOfUnsuccessfullDrainAttempts = 0;
+ }
+
+ if (numberOfUnsuccessfullDrainAttempts >= MAX_UNSUCCESSFULL_DRAINS) {
+
+ log.debug(String.format("inactivating, %d", getId()));
+
+ active = false;
+ numberOfUnsuccessfullDrainAttempts = 0;
+ }
+
+ } finally {
+ activeLock.writeLock().unlock();
+ }
+
+ send(localList);
+ localList.clear();
+
+ if (numberOfUnsuccessfullDrainAttempts > 0) {
+ waitForMessages();
+ }
+
+ }
+
+ if (log.isDebugEnabled())
+ log.debug(String.format("calling on completion from : %d,", getId()));
+
+ callback.onCompletion(this);
+
+ }
+
+ private void send(LinkedList<LightweightMsg> list) {
+
+ while (!list.isEmpty()) {
+
+ LightweightMsg m = list.removeFirst();
+
+ try {
+ OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(m.getPayLoad()));
+
+ sender.send(m.getConsumerInfo(), messgae2Send, m.getHeader());
+
+ } catch (Exception e) {
+ log.fatal(e);
+ }
+
+ }
+
+ }
+
+ private void waitForMessages() {
+ try {
+
+ TimeUnit.SECONDS.sleep(SLEEP_TIME_SECONDS);
+ log.debug("finished - waiting for messages");
+ } catch (InterruptedException e) {
+ log.error("interrupted while waiting for messages", e);
+ }
+ }
+ }
+}
Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java Sun Jul 3 15:51:36 2011
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.io.StringReader;
+import java.util.List;
+
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.config.ConfigurationManager;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.messenger.ConsumerUrlManager;
+import org.apache.airavata.wsmg.messenger.SenderUtils;
+import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
+import org.apache.axiom.om.OMElement;
+import org.apache.log4j.Logger;
+
+public class SerialSender extends Thread implements SendingStrategy {
+
+ Logger log = Logger.getLogger(SerialSender.class);
+
+ private boolean stopFlag = false;
+
+ SenderUtils sender;
+
+ public SerialSender(ConfigurationManager config, ConsumerUrlManager urlman) {
+ sender = new SenderUtils(urlman, config);
+ }
+
+ public void shutdown() {
+ stopFlag = true;
+ log.info("delivery thread termination notificaiton recieved");
+ }
+
+ public void run() {
+
+ log.debug("run - delivery thread");
+
+ while (!stopFlag) {
+
+ try {
+
+ OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
+
+ if (WSMGParameter.showTrackId)
+ log.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
+ + ": dequeued from outgoing queue");
+
+ sendNotification(outGoingMessage);
+
+ } catch (Exception e) {
+
+ log.fatal("Unexpected_exception:", e);
+ }
+ }
+ }
+
+ public synchronized void sendNotification(OutGoingMessage outGoingMessage) {
+
+ if (outGoingMessage == null) {
+ log.error("got a null outgoing message");
+ return;
+ }
+ String messageString = outGoingMessage.getTextMessage();
+
+ List<ConsumerInfo> consumerInfoList = outGoingMessage.getConsumerInfoList();
+ AdditionalMessageContent soapHeader = outGoingMessage.getAdditionalMessageContent();
+ deliverMessage(consumerInfoList, messageString, soapHeader);
+ }
+
+ private void deliverMessage(List<ConsumerInfo> consumerInfoList, String messageString,
+ AdditionalMessageContent additionalMessageContent) {
+
+ try {
+ OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(messageString));
+
+ for (ConsumerInfo obj : consumerInfoList) {
+
+ sender.send(obj, messgae2Send, additionalMessageContent);
+
+ }
+
+ } catch (XMLStreamException e) {
+ log.fatal(e);
+ }
+
+ }
+}
Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ThreadCrew.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ThreadCrew.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ThreadCrew.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ThreadCrew.java Sun Jul 3 15:51:36 2011
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+interface TaskCompletionCallback {
+ void onCompletion(ThreadCrew.WSMGRunnable r);
+}
+
+interface RunnableEx {
+ public void run();
+
+}
+
+public class ThreadCrew implements TaskCompletionCallback {
+
+ int nextIndex = 0;
+
+ Logger log = Logger.getLogger(ThreadCrew.class);
+
+ List<WSMGRunnable> wsmgRunnables = new ArrayList<WSMGRunnable>();
+
+ ExecutorService service;
+
+ public void onCompletion(WSMGRunnable r) {
+
+ synchronized (wsmgRunnables) {
+ log.error("error occued ...");
+ List<RunnableEx> jobList = r.getJobList();
+ wsmgRunnables.remove(r);
+ WSMGRunnable newR = new WSMGRunnable(this, jobList);
+ wsmgRunnables.add(newR);
+ service.submit(newR);
+ }
+
+ }
+
+ public ThreadCrew(int poolSize) {
+
+ log.debug("pool size=" + poolSize);
+
+ service = Executors.newFixedThreadPool(poolSize);
+
+ for (int i = 0; i < poolSize; i++) {
+
+ WSMGRunnable r = new WSMGRunnable(this);
+ wsmgRunnables.add(r);
+ service.submit(r);
+ }
+
+ }
+
+ public void submitTask(RunnableEx task) {
+
+ WSMGRunnable r = null;
+
+ synchronized (wsmgRunnables) {
+
+ nextIndex++;
+ if (nextIndex >= wsmgRunnables.size())
+ nextIndex = 0;
+
+ r = wsmgRunnables.get(nextIndex);
+
+ }
+
+ r.submit(task);
+ }
+
+ public void stop() {
+
+ for (WSMGRunnable r : wsmgRunnables) {
+ r.shutdown();
+ }
+
+ service.shutdown();
+ }
+
+ class WSMGRunnable implements Runnable {
+
+ List<RunnableEx> jobs = new ArrayList<RunnableEx>();
+ TaskCompletionCallback completionCallback;
+ boolean runFlag = true;
+
+ public WSMGRunnable(TaskCompletionCallback c) {
+
+ this(c, new ArrayList<RunnableEx>());
+ System.out.println("org.apache.airavata.wsmg created new");
+ }
+
+ public WSMGRunnable(TaskCompletionCallback c, List<RunnableEx> jl) {
+ completionCallback = c;
+ jobs = jl;
+ }
+
+ public void shutdown() {
+ runFlag = false;
+ }
+
+ public void run() {
+
+ try {
+
+ int i = 0;
+
+ while (runFlag) {
+
+ if (jobs.isEmpty()) {
+ waitForJobs();
+ continue;
+ }
+
+ RunnableEx runnable = jobs.get(i);
+ runnable.run();
+
+ i++;
+ if (i >= jobs.size()) {
+ i = 0;
+ }
+
+ }
+
+ } finally {
+
+ completionCallback.onCompletion(this);
+ }
+
+ }
+
+ public void submit(RunnableEx w) {
+
+ synchronized (jobs) {
+ jobs.add(w);
+ }
+
+ }
+
+ public List<RunnableEx> getJobList() {
+ return jobs;
+ }
+
+ final int SLEEP_TIME_SECONDS = 1;
+
+ private void waitForJobs() {
+ try {
+
+ TimeUnit.SECONDS.sleep(SLEEP_TIME_SECONDS);
+ if (log.isDebugEnabled())
+ log.debug("finished - waiting for messages");
+ } catch (InterruptedException e) {
+ log.error("interrupted while waiting for messages", e);
+ }
+ }
+ }
+
+}
Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/OutGoingMessage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/OutGoingMessage.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/OutGoingMessage.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/OutGoingMessage.java Sun Jul 3 15:51:36 2011
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.processors;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+
+public class OutGoingMessage implements Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -6999667921413261492L;
+
+ String textMessage;
+
+ AdditionalMessageContent additionalMessageContent;
+
+ List<ConsumerInfo> consumerInfoList = null;
+
+ // ConsumerInfo consumerInfo=null;
+
+ /**
+ *
+ */
+ public OutGoingMessage() {
+ // super();
+ // TODO Auto-generated constructor stub
+ // consumerInfo=new ConsumerInfo();
+ }
+
+ /**
+ * @param textMessage
+ * @param additionalMessageContent
+ * @param consumerInfoList
+ */
+ public OutGoingMessage(String textMessage, AdditionalMessageContent additionalMessageContent,
+ List<ConsumerInfo> consumerInfoList) {
+ super();
+ // TODO Auto-generated constructor stub
+ this.textMessage = textMessage;
+ this.additionalMessageContent = additionalMessageContent;
+ this.consumerInfoList = consumerInfoList;
+ }
+
+ /**
+ * @param consumerInfo
+ * The consumerInfo to set.
+ */
+ public void addConsumerInfo(ConsumerInfo consumerInfo) {
+ // this.consumerInfo = consumerInfo;
+ if (consumerInfoList == null) {
+ consumerInfoList = new ArrayList<ConsumerInfo>();
+ }
+ consumerInfoList.add(consumerInfo);
+ }
+
+ /**
+ * @return Returns the textMessage.
+ */
+ public String getTextMessage() {
+ return textMessage;
+ }
+
+ /**
+ * @param textMessage
+ * The textMessage to set.
+ */
+ public void setTextMessage(String textMessage) {
+ this.textMessage = textMessage;
+ }
+
+ /**
+ * @return Returns the consumerInfoList.
+ */
+ public List<ConsumerInfo> getConsumerInfoList() {
+ return consumerInfoList;
+ }
+
+ /**
+ * @param consumerInfoList
+ * The consumerInfoList to set.
+ */
+ public void setConsumerInfoList(List<ConsumerInfo> consumerInfoList) {
+ this.consumerInfoList = consumerInfoList;
+ }
+
+ /**
+ * @return Returns the soapHeader.
+ */
+ public AdditionalMessageContent getAdditionalMessageContent() {
+ return additionalMessageContent;
+ }
+
+ /**
+ * @param soapHeader
+ * The soapHeader to set.
+ */
+ public void setAdditionalMessageContent(AdditionalMessageContent soapHeader) {
+ this.additionalMessageContent = soapHeader;
+ }
+
+}
Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/PublisherRegistrationManager.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/PublisherRegistrationManager.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/PublisherRegistrationManager.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/PublisherRegistrationManager.java Sun Jul 3 15:51:36 2011
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.processors;
+
+import java.net.URI;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+import org.apache.airavata.wsmg.commons.WsmgNameSpaceConstants;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.EndpointReferenceHelper;
+import org.apache.log4j.Logger;
+
+public class PublisherRegistrationManager {
+ int counter = 1;
+ OMFactory factory = OMAbstractFactory.getOMFactory();
+ URI serviceLocation = null;
+ static org.apache.log4j.Logger logger = Logger.getLogger(PublisherRegistrationManager.class);
+
+ public PublisherRegistrationManager(URI serviceLocation) {
+ this.serviceLocation = serviceLocation;
+ }
+
+ public void registerPublisher(ProcessingContext ctx) {
+
+ OMElement eprEl = ctx.getSoapBody().getFirstChildWithName(
+ new QName(WsmgNameSpaceConstants.WSBR_NS.getNamespaceURI(), "PublisherReference"));
+ new EndpointReference(eprEl.getText());
+ // SubscriptionState state = new SubscriptionState(publisherRef);
+ // String key = "sub"+(counter++)+"@"+PREFIX;
+ // subscriptions.put(key, state);
+ EndpointReference publisherRegistrationRef = new EndpointReference(serviceLocation.toASCIIString());
+ publisherRegistrationRef.addReferenceParameter(new QName(WsmgNameSpaceConstants.WIDGET_NS.getNamespaceURI(),
+ WsmgNameSpaceConstants.RESOURCE_ID), "pub" + (counter++));
+
+ OMElement publisherRegistrationReferenceOMElement;
+ try {
+ publisherRegistrationReferenceOMElement = EndpointReferenceHelper.toOM(factory, publisherRegistrationRef,
+ new QName(WsmgNameSpaceConstants.WSBR_NS.getNamespaceURI(), "SubscriptionManager"),
+ WsmgNameSpaceConstants.WSA_NS.getNamespaceURI());
+
+ ctx.getRespMessage().addChild(publisherRegistrationReferenceOMElement);
+ } catch (AxisFault e) {
+ // TODO add with throws clause
+ logger.fatal("axis fault found at publisher registrationgmanager", e);
+ e.printStackTrace();
+ }
+ }
+}
Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/ReceivedMessage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/ReceivedMessage.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/ReceivedMessage.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/processors/ReceivedMessage.java Sun Jul 3 15:51:36 2011
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.processors;
+
+import java.io.Serializable;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+
+public class ReceivedMessage implements Serializable {
+
+ private static final long serialVersionUID = 7908767667077753895L;
+
+ String topicLocalString = null;
+
+ // String soapHeader = null;
+ AdditionalMessageContent soapHeader;
+
+ String notificationMessage = null;
+
+ String wsntMessageConverterClassName;
+
+ /**
+ * @param topicLocalString
+ * @param producerReference
+ * @param notificationMessageEl
+ * @param wsntMessageConverterClassName
+ */
+ public ReceivedMessage(String topicLocalString, AdditionalMessageContent soapHeader, String notificationMessageEl,
+ String wsntMessageConverterClassName) {
+ super();
+ // TODO Auto-generated constructor stub
+ this.topicLocalString = topicLocalString;
+ this.soapHeader = soapHeader;
+ this.notificationMessage = notificationMessageEl;
+ this.wsntMessageConverterClassName = wsntMessageConverterClassName;
+ }
+
+ /**
+ * @return Returns the soapHeader.
+ */
+ public AdditionalMessageContent getSoapHeader() {
+ return soapHeader;
+ }
+
+ /**
+ * @param soapHeader
+ * The soapHeader to set.
+ */
+ public void setSoapHeader(AdditionalMessageContent soapHeader) {
+ this.soapHeader = soapHeader;
+ }
+
+ /**
+ * @return Returns the notificationMessage.
+ */
+ public String getNotificationMessage() {
+ return notificationMessage;
+ }
+
+ /**
+ * @param notificationMessage
+ * The notificationMessage to set.
+ */
+ public void setNotificationMessage(String notificationMessage) {
+ this.notificationMessage = notificationMessage;
+ }
+
+ /**
+ * @return Returns the topicLocalString.
+ */
+ public String getTopicLocalString() {
+ return topicLocalString;
+ }
+
+ /**
+ * @param topicLocalString
+ * The topicLocalString to set.
+ */
+ public void setTopicLocalString(String topicLocalString) {
+ this.topicLocalString = topicLocalString;
+ }
+
+ /**
+ * @return Returns the wsntMessageConverterClassName.
+ */
+ public String getWsntMessageConverterClassName() {
+ return wsntMessageConverterClassName;
+ }
+
+ /**
+ * @param wsntMessageConverterClassName
+ * The wsntMessageConverterClassName to set.
+ */
+ public void setWsntMessageConverterClassName(String wsntMessageConverterClassName) {
+ this.wsntMessageConverterClassName = wsntMessageConverterClassName;
+ }
+
+}
Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/transports/jms/MessageMatcherConnection.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/transports/jms/MessageMatcherConnection.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/transports/jms/MessageMatcherConnection.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/transports/jms/MessageMatcherConnection.java Sun Jul 3 15:51:36 2011
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.transports.jms;
+
+public interface MessageMatcherConnection {
+ public int stop();
+}
Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java Sun Jul 3 15:51:36 2011
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+//Used for stress test. use together with TimerThread
+public class Counter {
+
+ private AtomicLong counter = new AtomicLong(0);
+
+ private AtomicReference<String> otherStringValue = new AtomicReference<String>();
+
+ public void addCounter() {
+ counter.getAndIncrement();
+
+ }
+
+ public synchronized void addCounter(String otherValue) {
+ counter.getAndIncrement();
+ otherStringValue.set(otherValue);
+ }
+
+ /**
+ * @return Returns the counterValue.
+ */
+ public long getCounterValue() {
+
+ return counter.get();
+ }
+
+ /**
+ * @param counterValue
+ * The counterValue to set.
+ */
+ public void setCounterValue(long counterValue) {
+ counter.set(counterValue);
+
+ }
+
+ /**
+ * @return Returns the otherValueString.
+ */
+ public String getOtherValueString() {
+
+ return otherStringValue.get();
+ }
+
+ /**
+ * @param otherValueString
+ * The otherValueString to set.
+ */
+ public void setOtherValueString(String otherValueString) {
+ otherStringValue.set(otherValueString);
+ }
+
+}
Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/CurrentDate.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/CurrentDate.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/CurrentDate.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/CurrentDate.java Sun Jul 3 15:51:36 2011
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import java.util.TimeZone;
+
+import org.apache.axiom.om.OMElement;
+
+public class CurrentDate implements Cloneable {
+ private OMElement parent;
+
+ public CurrentDate(OMElement parent) {
+ this.parent = parent;
+
+ }
+
+ public Object clone() throws CloneNotSupportedException {
+
+ OMElement element = parent.cloneOMElement();
+
+ return new CurrentDate(element);
+
+ }
+
+ public String getText() {
+ // active entity: value computed on demand
+ // TODO use static method that would format System.currentTimeMillis +
+ // TZ
+ DcDate now = new DcDate(TimeZone.getDefault());
+ return now.toString();
+ }
+
+ public Boolean isWhitespaceContent() {
+ return null;
+ }
+
+ public OMElement getParent() {
+ return parent;
+ }
+
+ public void setParent(OMElement currentDateParent) {
+ this.parent = currentDateParent;
+ }
+}
Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/DcDate.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/DcDate.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/DcDate.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/DcDate.java Sun Jul 3 15:51:36 2011
@@ -0,0 +1,472 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TimeZone;
+
+//TODO: support for fractional seconds
+
+/**
+ * Represent dc:date as an immutable value object. http://www.w3.org/TR/NOTE-datetime
+ * http://www.sics.se/~preben/DC/date_help.html
+ *
+ */
+public final class DcDate implements Cloneable {
+
+ public Object clone() throws CloneNotSupportedException {
+ // it is easy as object is immutable
+ DcDate dd = (DcDate) super.clone();
+ return dd;
+ }
+
+ private String canonical;
+
+ // private Date date;
+ private int year;
+
+ private int month; // 1-12
+
+ private int day; // 1-31
+
+ private int hh = -1; // 00-23
+
+ private int mm = -1; // 00-59
+
+ private int ss = -1; // 00-59
+
+ private String decimalFraction = null; // decimal fraction
+
+ private int zoneOffset; // in minutes +/-(12*60) ???
+
+ private Calendar instant;
+
+ private int millisOffset = -1; // milliseconds
+
+ // public DcDate() throws SException {
+ //
+ // this(System.currentTimeMillis());
+ // }
+ //
+ // public DcDate(long timeMillis) throws SException {
+ // this(timeMillis, null);
+ // Calendar local = new GregorianCalendar();
+ // local.setTimeInMillis(timeMillis);
+ // }
+
+ // public DcDate(String tzName) throws SException {
+ // TimeZone tz = tzName != null ? TimeZone.getTimeZone(tzName) :
+ // TimeZone.getDefault();
+ // TimeZone.getTimeZone("GMT")
+ public DcDate(TimeZone tz) throws RuntimeException {
+
+ // based on http://javaalmanac.com/egs/java.util/GetAllZones.html?l=rel
+
+ Calendar cal = new GregorianCalendar(tz);
+ init(cal, tz);
+ }
+
+ public DcDate(Calendar cal) throws RuntimeException {
+ init(cal, cal.getTimeZone());
+ }
+
+ private void init(Calendar cal, TimeZone tz) throws RuntimeException {
+ // Get the number of hours from GMT
+ int rawOffset = tz.getRawOffset();
+ zoneOffset = rawOffset / (60 * 1000);
+
+ instant = cal;
+ // http://javaalmanac.com/egs/java.util/GetCurDate.html
+ year = cal.get(Calendar.YEAR);
+ month = cal.get(Calendar.MONTH) + 1; // 0=Jan, 1=Feb, ...
+ day = cal.get(Calendar.DAY_OF_MONTH);
+ hh = cal.get(Calendar.HOUR_OF_DAY); // 0..23
+ mm = cal.get(Calendar.MINUTE); // 0..59
+ ss = cal.get(Calendar.SECOND); // 0..59
+ // int ms = cal.get(Calendar.MILLISECOND); // 0..999
+ // date = cal.getTime();
+ canonical = computeCanonical();
+ }
+
+ public DcDate(String dcDate) throws RuntimeException {
+ // try {
+ // synchronized(sdf) { //TODO REVISIT: SimpleFormat is not multi-thread
+ // safe!
+ // d = sdf.parse(dcDate);
+ // }
+ // } catch (ParseException e) {
+ // throw new SException("could not parse dc:date "+dcDate+getCtx(pp),
+ // e);
+ // }
+ // 2003-05-06T23:07:04Z
+ // 2003-08-09T18:36:00-05:00
+ // 1234567890123456789012345
+ // 2004-02-02T19:09:46-05:00
+ // 1997
+ // 1997-07
+ // 1997-07-16
+ // 1997-07-16T19:20+01:00
+ // 1997-07-16T19:20:30+01:00
+ // 1997-07-16T19:20:30.45+01:00
+ assert dcDate != null;
+ // assert pp != null;
+ canonical = dcDate;
+ year = getInt(dcDate, 1, 4); // add min, max check for all getInt
+ if (dcDate.length() == 4) {
+ return;
+ }
+ check(dcDate, 5, '-');
+ month = getInt(dcDate, 6, 2);
+ if (dcDate.length() == 7) {
+ return;
+ }
+ check(dcDate, 8, '-');
+ day = getInt(dcDate, 9, 2);
+ if (dcDate.length() == 10) {
+ return;
+ }
+ check(dcDate, 11, 'T');
+ hh = getInt(dcDate, 12, 2);
+ check(dcDate, 14, ':');
+ mm = getInt(dcDate, 15, 2);
+ if (dcDate.length() == 16) {
+ throw new RuntimeException("expected date formatted as YYYY-MM-DDThh:mm[:ss[.mmm]]TZD and not " + dcDate);
+ }
+ int pos = 17;
+ char c17 = dcDate.charAt(pos - 1);
+ if (c17 == ':') {
+ check(dcDate, 17, ':');
+ ss = getInt(dcDate, 18, 2);
+ pos = 20;
+ }
+ char zoneIndicator = dcDate.charAt(pos - 1);
+ if (zoneIndicator == '.') { // OK we have yet millliseocnds to parse
+ // (and ignore ...)
+ // Ex: 2004-04-13T11:53:15.4362784-04:00
+ // eat digits
+ char d;
+ int oldPos = pos;
+ do {
+ d = dcDate.charAt(pos);
+ ++pos;
+ } while (d >= '0' && d <= '9');
+ if (oldPos + 1 == pos) {
+ throw new RuntimeException("expected date formtted as YYYY-MM-DDThh:mm[:ss[.s]]TZD and not " + dcDate);
+
+ }
+ zoneIndicator = d;
+ int newPos = pos;
+ decimalFraction = dcDate.substring(oldPos, newPos - 1);
+ if (newPos - oldPos >= 3) {
+ newPos = oldPos + 3;
+ }
+ int len = newPos - (oldPos + 1);
+ int ii = getInt(dcDate, oldPos + 1, len);
+ if (len == 1) {
+ millisOffset = 100 * ii;
+ } else if (len == 2) {
+ millisOffset = 10 * ii;
+ } else if (len == 3) {
+ millisOffset = ii;
+ }
+ }
+ if (zoneIndicator == 'Z') {
+ // done
+ } else if (zoneIndicator == '-' || zoneIndicator == '+') {
+ int zoneHH = getInt(dcDate, pos + 1, 2);
+ check(dcDate, pos + 3, ':');
+ int zoneMM = getInt(dcDate, pos + 4, 2);
+ zoneOffset = 60 * zoneHH + zoneMM;
+ if (zoneIndicator == '-') {
+ zoneOffset *= -1;
+ }
+ } else {
+ throw new RuntimeException("unknown zone indicator " + zoneIndicator + " in " + dcDate);
+ }
+ // Get the number of hours from GMT
+
+ // TimeZone tz = TimeZone.
+ instant = new GregorianCalendar();
+ int rawOffset = zoneOffset * 60 * 1000;
+ instant.set(Calendar.ZONE_OFFSET, rawOffset);
+ instant.set(Calendar.YEAR, year);
+ instant.set(Calendar.MONTH, month - 1); // 0=Jan, 1=Feb, ...
+ instant.set(Calendar.DAY_OF_MONTH, day);
+ instant.set(Calendar.HOUR_OF_DAY, hh); // 0..23
+ instant.set(Calendar.MINUTE, mm); // 0..59
+ instant.set(Calendar.SECOND, ss); // 0..59
+ instant.set(Calendar.MILLISECOND, millisOffset); // /0..999 ?
+ instant.getTimeInMillis(); // full time in ms -- test?
+ }
+
+ public long getTimeInMillis() {
+ return instant.getTimeInMillis();
+ }
+
+ public int getYear() {
+ return year;
+ }
+
+ public int getMonth() {
+ return month;
+ }
+
+ public int getDay() {
+ return day;
+ }
+
+ public int getHour() {
+ return hh;
+ }
+
+ public int getMinute() {
+ return mm;
+ }
+
+ public int getSecond() {
+ return ss;
+ }
+
+ public int getTimeZoneOffset() {
+ return zoneOffset;
+ }
+
+ public String getDcDate() {
+ return canonical;
+ }
+
+ public String getDecimalFraction() {
+ return decimalFraction;
+ }
+
+ private String computeCanonical() {
+ // 2003-08-09T18:36:00-05:00
+ // 1234567890123456789012345
+ StringBuffer sb = new StringBuffer();
+ fill(sb, year, 4);
+ if (month > 0) {
+ sb.append('-');
+ fill(sb, month, 2);
+ if (day > 0) {
+ sb.append('-');
+ fill(sb, day, 2);
+ if (hh > -1) {
+ sb.append('T');
+ fill(sb, hh, 2);
+ sb.append(':');
+ fill(sb, mm, 2);
+ if (ss > -1) {
+ sb.append(':');
+ fill(sb, ss, 2);
+ }
+ if (decimalFraction != null) {
+ sb.append('.');
+ sb.append(decimalFraction);
+ }
+ if (zoneOffset == 0) {
+ sb.append('Z');
+ } else {
+ int off = zoneOffset;
+ if (zoneOffset > 0) {
+ sb.append('+');
+ } else {
+ sb.append('-');
+ off *= -1;
+ }
+ int zoneHH = off / 60;
+ int zoneMM = off % 60;
+ fill(sb, zoneHH, 2);
+ sb.append(':');
+ fill(sb, zoneMM, 2);
+ }
+ }
+ }
+ }
+ return sb.toString();
+ }
+
+ public String toString() {
+ return canonical;
+ }
+
+ public final static String LOCATION_PROPERTY = "http://xmlpull.org/v1/doc/properties.html#location";
+
+ public static String printable(char ch) {
+ return "'" + escape(ch) + "'";
+ }
+
+ public static String printable(String s) {
+ return "\"" + escape(s) + "\"";
+ }
+
+ public static String escape(char ch) {
+ if (ch == '\n') {
+ return "\\n";
+ } else if (ch == '\r') {
+ return "\\r";
+ } else if (ch == '\t') {
+ return "\\t";
+ } else if (ch == '\'') {
+ return "\\'";
+ }
+ if (ch > 127 || ch < 32) {
+ return "\\u" + Integer.toHexString((int) ch);
+ }
+ return "" + ch;
+ }
+
+ public static String escape(String s) {
+ if (s == null) {
+ return null;
+ }
+ final int sLen = s.length();
+ StringBuffer buf = new StringBuffer(sLen + 10);
+ for (int i = 0; i < sLen; ++i) {
+ buf.append(escape(s.charAt(i)));
+ }
+ s = buf.toString();
+ return s;
+ }
+
+ private static final int LOOKUP_10S[] = { 1, 10, 10 * 10, 10 * 100, 100 * 100 };
+
+ public static void fill(StringBuffer sb, int value, int fields) {
+ assert fields > 0;
+ assert fields <= 4;
+ assert value >= 0;
+ int mm = LOOKUP_10S[fields];
+ assert value < mm;
+ // assert mm > 0
+ // TODO: optimize it ...
+ while (fields-- > 0) {
+ mm /= 10;
+ if (value >= mm) {
+ sb.append((value / mm) % 10);
+ // i /= 10;
+ } else {
+ sb.append('0');
+ }
+ }
+
+ // String s = sb.toString();
+ // assert s.toString().length == fields;
+ // return s;
+ }
+
+ public static void check(String dcDate, int pos, char ch) {
+ if (pos > dcDate.length()) {
+ throw new RuntimeException("expected " + printable(ch) + " at position " + pos + " but " + dcDate
+ + " is too short");
+ }
+ char c = dcDate.charAt(pos - 1);
+ if (c != ch) {
+ throw new RuntimeException("expected " + printable(ch) + " but got " + printable(c) + " in " + dcDate);
+ }
+ }
+
+ public static int getInt(String dcDate, int pos, int len) throws RuntimeException {
+ assert len > 0;
+ int end = pos + len - 1;
+ String s = dcDate.substring(pos - 1, end);
+ try {
+ int i = Integer.parseInt(s);
+ return i;
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("expected number for " + printable(s) + " in " + dcDate, e);
+
+ }
+ }
+
+ /**
+ * Take string and return string that has no spaces, only alpahnumerics, each word is capitalized (like WikiWords)
+ * sometimes called CamelCase?!.
+ */
+ public static String getWikiTitle(String title) {
+ StringBuffer sb = new StringBuffer();
+ List<String> words = breakIntoWords(title);
+ boolean start = true;
+ for (Iterator<String> it = words.iterator(); it.hasNext();) {
+ String word = it.next();
+ boolean wordStart = true;
+ for (int i = 0; i < word.length(); i++) {
+ char c = word.charAt(i);
+ if (Character.isLetterOrDigit(c)) {
+ if (wordStart && !start) {
+ sb.append(Character.toUpperCase(c));
+ } else {
+ sb.append(c);
+ }
+ wordStart = false;
+ }
+ }
+ start = false;
+ }
+ return sb.toString();
+ }
+
+ public static List<String> breakIntoWords(String s) {
+ List<String> words = new ArrayList<String>(s.length() / 5);
+ boolean inWord = true;
+ int wordStart = 0;
+ for (int pos = 0; pos < s.length(); ++pos) {
+ char ch = s.charAt(pos);
+ boolean isWordSeparator = Character.isWhitespace(ch);
+ if (ch == ',') {
+ isWordSeparator = true;
+ }
+ if (ch == '.') {
+ isWordSeparator = true;
+ }
+ if (isWordSeparator) {
+ if (inWord) {
+ words.add(s.substring(wordStart, pos));
+ inWord = false;
+ }
+ } else {
+ if (!inWord) {
+ inWord = true;
+ wordStart = pos;
+ }
+ }
+ assert inWord == !isWordSeparator;
+ }
+ if (inWord) {
+ words.add(s.substring(wordStart));
+ }
+ return words;
+ }
+
+ public static String makeTwoDigit(int oneOrTwoDigits) {
+ if (oneOrTwoDigits < 0 || oneOrTwoDigits > 99) {
+ throw new IllegalArgumentException();
+ }
+ if (oneOrTwoDigits < 10) {
+ return "0" + oneOrTwoDigits;
+ }
+ return "" + oneOrTwoDigits;
+ }
+
+}
Added: incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/LoadConfigProperties.java
URL: http://svn.apache.org/viewvc/incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/LoadConfigProperties.java?rev=1142453&view=auto
==============================================================================
--- incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/LoadConfigProperties.java (added)
+++ incubator/airavata/ws-messaging/trunk/messagebroker/src/main/java/org/apache/airavata/wsmg/util/LoadConfigProperties.java Sun Jul 3 15:51:36 2011
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import java.io.File;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ConfigurationContextFactory;
+import org.apache.log4j.Logger;
+
+public class LoadConfigProperties {
+
+ private static Logger logger = Logger.getLogger(LoadConfigProperties.class);
+
+ public static LoadConfigProperties confProp = null;
+ private String axisRepo = null;
+ private ConfigurationContext configContext = null;
+
+ private LoadConfigProperties() {
+
+ String axis2RepoDir = System.getProperty("axis2.repository");
+
+ if (axis2RepoDir == null) {
+ String axis2Home = System.getenv("AXIS2_HOME");
+
+ if (axis2Home != null) {
+ axis2Home = axis2Home.trim();
+ axis2RepoDir = axis2Home.endsWith("/") ? axis2Home + "repository" : axis2Home + "/" + "repository";
+
+ }
+ }
+ File repoDir = null;
+ if (axis2RepoDir != null)
+ repoDir = new File(axis2RepoDir);
+
+ if (repoDir != null && repoDir.isDirectory()) {
+
+ try {
+ configContext = ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoDir
+ .getAbsolutePath());
+
+ axisRepo = repoDir.getAbsolutePath();
+
+ } catch (AxisFault e) {
+ logger.error("unable to load the repository", e);
+ }
+
+ }
+
+ }
+
+ public static LoadConfigProperties getInstance() {
+ if (confProp == null)
+ confProp = new LoadConfigProperties();
+ return confProp;
+ }
+
+ public String getAxisRepo() {
+ return axisRepo;
+ }
+
+ public boolean hasAxisRepo() {
+
+ logger.info("repo is : " + axisRepo);
+
+ return axisRepo != null;
+ }
+
+ public ConfigurationContext getConfCtxFromFileSystem() {
+ return configContext;
+ }
+}