You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2008/04/23 01:08:42 UTC
svn commit: r650693 - in /ode/branches/APACHE_ODE_1.1:
axis2/src/main/java/org/apache/ode/axis2/
utils/src/main/java/org/apache/ode/utils/fs/
Author: mriou
Date: Tue Apr 22 16:08:36 2008
New Revision: 650693
URL: http://svn.apache.org/viewvc?rev=650693&view=rev
Log:
ODE-272 Dynamic config for Axis2 external services.
Added:
ode/branches/APACHE_ODE_1.1/utils/src/main/java/org/apache/ode/utils/fs/FileWatchDog.java
Modified:
ode/branches/APACHE_ODE_1.1/axis2/src/main/java/org/apache/ode/axis2/BindingContextImpl.java
ode/branches/APACHE_ODE_1.1/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
ode/branches/APACHE_ODE_1.1/axis2/src/main/java/org/apache/ode/axis2/SoapExternalService.java
Modified: ode/branches/APACHE_ODE_1.1/axis2/src/main/java/org/apache/ode/axis2/BindingContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.1/axis2/src/main/java/org/apache/ode/axis2/BindingContextImpl.java?rev=650693&r1=650692&r2=650693&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.1/axis2/src/main/java/org/apache/ode/axis2/BindingContextImpl.java (original)
+++ ode/branches/APACHE_ODE_1.1/axis2/src/main/java/org/apache/ode/axis2/BindingContextImpl.java Tue Apr 22 16:08:36 2008
@@ -78,7 +78,7 @@
throw new ContextException("Cannot find definition for service " + initialPartnerEndpoint.serviceName
+ " in the context of process "+processId);
}
- return _server.createExternalService(wsdl, initialPartnerEndpoint.serviceName, initialPartnerEndpoint.portName);
+ return _server.createExternalService(wsdl, initialPartnerEndpoint.serviceName, initialPartnerEndpoint.portName, pconf);
}
}
Modified: ode/branches/APACHE_ODE_1.1/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.1/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java?rev=650693&r1=650692&r2=650693&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.1/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java (original)
+++ ode/branches/APACHE_ODE_1.1/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java Tue Apr 22 16:08:36 2008
@@ -317,7 +317,7 @@
return odeService;
}
- public ExternalService createExternalService(Definition def, QName serviceName, String portName) throws ContextException {
+ public ExternalService createExternalService(Definition def, QName serviceName, String portName, ProcessConf pconf) throws ContextException {
ExternalService extService = (ExternalService) _externalServices.get(serviceName);
if (extService != null)
return extService;
@@ -326,7 +326,7 @@
if (WsdlUtils.useHTTPBinding(def, serviceName, portName)) {
extService = new HttpExternalService(def, serviceName, portName, _executorService, _scheduler, _server);
} else if (WsdlUtils.useSOAPBinding(def, serviceName, portName)) {
- extService = new SoapExternalService(def, serviceName, portName, _executorService, _axisConfig, _scheduler, _server);
+ extService = new SoapExternalService(def, serviceName, portName, _executorService, _axisConfig, _scheduler, _server, pconf);
}
} catch (Exception ex) {
__log.error("Could not create external service.", ex);
Modified: ode/branches/APACHE_ODE_1.1/axis2/src/main/java/org/apache/ode/axis2/SoapExternalService.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.1/axis2/src/main/java/org/apache/ode/axis2/SoapExternalService.java?rev=650693&r1=650692&r2=650693&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.1/axis2/src/main/java/org/apache/ode/axis2/SoapExternalService.java (original)
+++ ode/branches/APACHE_ODE_1.1/axis2/src/main/java/org/apache/ode/axis2/SoapExternalService.java Tue Apr 22 16:08:36 2008
@@ -21,6 +21,8 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.io.File;
+import java.io.InputStream;
import javax.wsdl.Definition;
import javax.wsdl.Fault;
@@ -29,6 +31,7 @@
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
+import org.apache.axis2.deployment.ServiceBuilder;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.client.OperationClient;
import org.apache.axis2.client.Options;
@@ -48,11 +51,12 @@
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
import org.apache.ode.il.OMUtils;
import org.apache.ode.utils.DOMUtils;
import org.apache.ode.utils.Namespaces;
-import org.apache.ode.utils.wsdl.*;
+import org.apache.ode.utils.fs.FileWatchDog;
import org.apache.ode.utils.wsdl.Messages;
import org.apache.ode.utils.uuid.UUID;
import org.w3c.dom.Document;
@@ -83,9 +87,10 @@
private SoapMessageConverter _converter;
private Scheduler _sched;
private BpelServer _server;
+ private ProcessConf _pconf;
public SoapExternalService(Definition definition, QName serviceName, String portName, ExecutorService executorService,
- AxisConfiguration axisConfig, Scheduler sched, BpelServer server) throws AxisFault {
+ AxisConfiguration axisConfig, Scheduler sched, BpelServer server, ProcessConf pconf) throws AxisFault {
_definition = definition;
_serviceName = serviceName;
_portName = portName;
@@ -94,6 +99,8 @@
_sched = sched;
_converter = new SoapMessageConverter(definition, serviceName, portName);
_server = server;
+ _pconf = pconf;
+
}
public void invoke(final PartnerRoleMessageExchange odeMex) {
@@ -122,15 +129,8 @@
AuthenticationHelper.setHttpAuthentication(odeMex, options);
- CachedServiceClient cached = _cachedClients.get();
- long now = System.currentTimeMillis();
- if (cached == null || cached._expire < now) {
- cached = new CachedServiceClient();
- ConfigurationContext ctx = new ConfigurationContext(_axisConfig);
- cached._client = new ServiceClient(ctx, null);
- cached._expire = now+EXPIRE_SERVICE_CLIENT;
- _cachedClients.set(cached);
- }
+ CachedServiceClient cached = getCachedServiceClient();
+
final OperationClient operationClient = cached._client.createClient(isTwoWay ? ServiceClient.ANON_OUT_IN_OP
: ServiceClient.ANON_OUT_ONLY_OP);
operationClient.setOptions(options);
@@ -171,6 +171,7 @@
}
});
}
+
public void beforeCompletion() {
}
});
@@ -192,24 +193,39 @@
}
}
+ private CachedServiceClient getCachedServiceClient() throws AxisFault {
+ CachedServiceClient cached = _cachedClients.get();
+ if (cached == null) {
+ cached = new CachedServiceClient(new File(_pconf.getBaseURI().resolve(_serviceName.getLocalPart() + ".axis2")), EXPIRE_SERVICE_CLIENT);
+ _cachedClients.set(cached);
+ }
+ try {
+ // call manually the check procedure
+ // we dont want a dedicated thread for that
+ cached.checkAndConfigure();
+ } catch (Exception e) {
+ throw AxisFault.makeFault(e);
+ }
+ return cached;
+ }
+
/**
* Extracts the action to be used for the given operation. It first checks to see
- * if a value is specified using WS-Addressing in the portType, it then falls back onto
+ * if a value is specified using WS-Addressing in the portType, it then falls back onto
* getting it from the SOAP Binding.
+ *
* @param operation the name of the operation to get the Action for
* @return The action value for the specified operation
*/
- private String getAction(String operation)
- {
- String action = _converter.getWSAInputAction(operation);
- if (action == null || "".equals(action))
- {
- action = _converter.getSoapAction(operation);
+ private String getAction(String operation) {
+ String action = _converter.getWSAInputAction(operation);
+ if (action == null || "".equals(action)) {
+ action = _converter.getSoapAction(operation);
}
- return action;
- }
+ return action;
+ }
- /**
+ /**
* Extracts endpoint information from ODE message exchange to stuff them into Axis MessageContext.
*/
private void writeHeader(MessageContext ctxt, PartnerRoleMessageExchange odeMex) {
@@ -219,7 +235,7 @@
String partnerSessionId = odeMex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
String myRoleSessionId = odeMex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
-
+
if (partnerSessionId != null) {
if (__log.isDebugEnabled()) {
__log.debug("Partner session identifier found for WSA endpoint: " + partnerSessionId);
@@ -227,7 +243,7 @@
targetEPR.setSessionId(partnerSessionId);
}
options.setProperty("targetSessionEndpoint", targetEPR);
-
+
if (myRoleEPR != null) {
if (myRoleSessionId != null) {
if (__log.isDebugEnabled()) {
@@ -243,13 +259,13 @@
String action = getAction(odeMex.getOperationName());
ctxt.setSoapAction(action);
-
- if (MessageExchange.MessageExchangePattern.REQUEST_RESPONSE == odeMex.getMessageExchangePattern()) {
- EndpointReference annonEpr =
- new EndpointReference(Namespaces.WS_ADDRESSING_ANON_URI);
- ctxt.setReplyTo(annonEpr);
- ctxt.setMessageID("uuid:" + new UUID().toString());
- }
+
+ if (MessageExchange.MessageExchangePattern.REQUEST_RESPONSE == odeMex.getMessageExchangePattern()) {
+ EndpointReference annonEpr =
+ new EndpointReference(Namespaces.WS_ADDRESSING_ANON_URI);
+ ctxt.setReplyTo(annonEpr);
+ ctxt.setMessageID("uuid:" + new UUID().toString());
+ }
}
public org.apache.ode.bpel.iapi.EndpointReference getInitialEndpointReference() {
@@ -272,12 +288,12 @@
}
private void replyWithFailure(final String odeMexId, final FailureType error, final String errmsg,
- final Element details) {
+ final Element details) {
// ODE MEX needs to be invoked in a TX.
try {
_sched.execIsolatedTransaction(new Callable<Void>() {
public Void call() throws Exception {
- PartnerRoleMessageExchange odeMex = (PartnerRoleMessageExchange) _server.getEngine().getMessageExchange(odeMexId);
+ PartnerRoleMessageExchange odeMex = (PartnerRoleMessageExchange) _server.getEngine().getMessageExchange(odeMexId);
odeMex.replyWithFailure(error, errmsg, details);
return null;
}
@@ -296,7 +312,7 @@
try {
_sched.execIsolatedTransaction(new Callable<Void>() {
public Void call() throws Exception {
- PartnerRoleMessageExchange odeMex = (PartnerRoleMessageExchange) _server.getEngine().getMessageExchange(odeMexId);
+ PartnerRoleMessageExchange odeMex = (PartnerRoleMessageExchange) _server.getEngine().getMessageExchange(odeMexId);
// Setting the response
try {
if (__log.isDebugEnabled()) __log.debug("Received response for MEX " + odeMex);
@@ -312,8 +328,10 @@
__log.warn("Fault response: faultType=" + faultType + "\n" + DOMUtils.domToString(odeMsgEl));
QName nonNullFT = new QName(Namespaces.ODE_EXTENSION_NS, "unknownFault");
Fault f = odeMex.getOperation().getFault(faultType.getLocalPart());
- if (f != null && f.getMessage().getQName() != null) nonNullFT = f.getMessage().getQName();
- else __log.debug("Fault " + faultType + " isn't referenced in the service definition, unknown fault.");
+ if (f != null && f.getMessage().getQName() != null)
+ nonNullFT = f.getMessage().getQName();
+ else
+ __log.debug("Fault " + faultType + " isn't referenced in the service definition, unknown fault.");
Message response = odeMex.createMessage(nonNullFT);
response.setMessage(odeMsgEl);
@@ -345,12 +363,47 @@
String errmsg = "Error executing reply transaction; reply will be lost.";
__log.error(errmsg, e);
}
+
}
- // INNER CLASS
- static class CachedServiceClient {
+
+ /**
+ * This class wraps a {@link org.apache.axis2.client.ServiceClient} and watches changes (deletions,creations,updates)
+ * on a Axis2 service config file named {service-name}.axis2.<p/>
+ * The {@link org.apache.axis2.client.ServiceClient} instance is created from the main Axis2 config instance and
+ * this service-specific config file.
+ */
+ class CachedServiceClient extends FileWatchDog {
ServiceClient _client;
- long _expire;
+
+ protected CachedServiceClient(File file, long delay) {
+ super(file, delay);
+ }
+
+ protected boolean isInitialized() throws Exception {
+ return _client != null;
+ }
+
+ protected void init() throws Exception {
+ _client = new ServiceClient(new ConfigurationContext(_axisConfig), null);
+ }
+
+ protected void doOnUpdate() throws Exception {
+ // axis2 service configuration
+ // if the config file has been modified (i.e added or updated), re-create a ServiceClient
+ // and load the new config.
+ init(); //reset the ServiceClient instance
+ try {
+ InputStream ais = file.toURI().toURL().openStream();
+ if (ais != null) {
+ if (__log.isDebugEnabled()) __log.debug("Configuring service " + _serviceName + " using: " + file);
+ ServiceBuilder builder = new ServiceBuilder(ais, new ConfigurationContext(_client.getAxisConfiguration()), _client.getAxisService());
+ builder.populateService(builder.buildOM());
+ }
+ } catch (Exception e) {
+ if (__log.isWarnEnabled()) __log.warn("Exception while configuring service: " + _serviceName, e);
+ }
+ }
}
}
Added: ode/branches/APACHE_ODE_1.1/utils/src/main/java/org/apache/ode/utils/fs/FileWatchDog.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.1/utils/src/main/java/org/apache/ode/utils/fs/FileWatchDog.java?rev=650693&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.1/utils/src/main/java/org/apache/ode/utils/fs/FileWatchDog.java (added)
+++ ode/branches/APACHE_ODE_1.1/utils/src/main/java/org/apache/ode/utils/fs/FileWatchDog.java Tue Apr 22 16:08:36 2008
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.utils.fs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
+
+/**
+ * This class is based on {@link org.apache.log4j.helpers.FileWatchdog}.<p/>
+ * Modifications have been made to support additional file events (creation, deletion and updates), and to allow "manual"
+ * invocations of {@link #checkAndConfigure()} (i.e wihtout having to use a thread) while preserving time checking.<p/>
+ * Now two use cases coexist:
+ * <ol>
+ * <li>Pass an instance of {@link FileWatchDog} to a new thread ({@link FileWatchDog} is a {@link Runnable}).
+ * So that {@link FileWatchDog#checkAndConfigure()} will be called automatically every {@code delay} milliseconds.</li>
+ * <li>Invoke {@link FileWatchDog#checkAndConfigure()} only when you feel like it. If the expiration date previously set is lower than NOW then event
+ * callback methods will be invoked accordingly.</li>
+ * </ol>
+ *
+ * @author Ceki Gülcü
+ * @author <a href="mailto:midon@intalio.com">Alexis Midon</a>
+ */
+public class FileWatchDog implements Runnable {
+ static final public long DEFAULT_DELAY = 60000;
+ private final Log log;
+
+ long expire;
+ long lastModif;
+ long delay = DEFAULT_DELAY;
+ boolean fileExistedBefore, warnedAlready, interrupted;
+ protected final File file;
+
+ protected FileWatchDog(File file, long delay) {
+ this(file);
+ this.delay = delay;
+ }
+
+ protected FileWatchDog(File file) {
+ this.file = file;
+ log = LogFactory.getLog(FileWatchDog.class);
+ }
+
+ protected boolean isInitialized() throws Exception {
+ return true;
+ }
+
+ protected void init() throws Exception {
+ }
+
+ protected void doOnDelete() throws Exception {
+ init();
+ }
+
+ protected void doOnUpdate() throws Exception {
+ init();
+ }
+
+ public long getDelay() {
+ return delay;
+ }
+
+ public void setDelay(long delay) {
+ this.delay = delay;
+ }
+
+ public void run() {
+ try {
+ while (!interrupted) {
+ try {
+ Thread.currentThread().sleep(delay);
+ } catch (InterruptedException e) {
+ // no interruption expected
+ }
+ checkAndConfigure();
+ }
+ } catch (Exception e) {
+ log.warn("Exception occured. Thread will stop", e);
+ }
+ }
+
+
+ public final void checkAndConfigure() throws Exception {
+ long now = System.currentTimeMillis();
+ if (expire <= now) {
+ expire = now + delay;
+ boolean fileExists;
+ try {
+ fileExists = file.exists();
+ } catch (SecurityException e) {
+ log.warn("Was not allowed to read check file existance, file:[" + file.getPath() + "].");
+ interrupted = true; // there is no point in continuing
+ return;
+ }
+
+ if (fileExists) {
+ fileExistedBefore = true;
+ long l = file.lastModified();
+ if (l > lastModif) {
+ lastModif = l;
+ if (log.isDebugEnabled())
+ log.debug("File [" + file + "] has been modified");
+ doOnUpdate();
+ warnedAlready = false;
+ }
+ } else if (!isInitialized()) {
+ // first time and no file
+ init();
+ } else {
+ if (fileExistedBefore) {
+ fileExistedBefore = false;
+ doOnDelete();
+ }
+ if (!warnedAlready) {
+ warnedAlready = true;
+ if (log.isDebugEnabled()) log.debug("[" + file + "] does not exist.");
+ }
+ }
+ }
+ }
+
+
+}