You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuscany.apache.org by jm...@apache.org on 2006/09/07 16:59:24 UTC
svn commit: r441106 - in /incubator/tuscany/java/sca:
containers/container.groovy/src/main/java/org/apache/tuscany/container/groovy/
containers/container.groovy/src/test/java/org/apache/tuscany/container/groovy/
kernel/core/src/main/java/org/apache/tus...
Author: jmarino
Date: Thu Sep 7 07:59:22 2006
New Revision: 441106
URL: http://svn.apache.org/viewvc?view=rev&rev=441106
Log:
[PATCH] commit patch for TUSCANY-706 from Ignacio
Added:
incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/MessageId.java (with props)
Modified:
incubator/tuscany/java/sca/containers/container.groovy/src/main/java/org/apache/tuscany/container/groovy/AsyncGroovyInvoker.java
incubator/tuscany/java/sca/containers/container.groovy/src/main/java/org/apache/tuscany/container/groovy/GroovyAtomicComponent.java
incubator/tuscany/java/sca/containers/container.groovy/src/test/java/org/apache/tuscany/container/groovy/AsyncInvokerTestCase.java
incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/builder/ConnectorImpl.java
incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/component/WorkContextImpl.java
incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/composite/AbstractCompositeComponent.java
incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/java/AsyncJavaTargetInvoker.java
incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/java/JavaAtomicComponent.java
incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/component/SystemSingletonAtomicComponent.java
incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/wire/SystemInboundWireImpl.java
incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/wire/SystemOutboundAutowire.java
incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/wire/SystemOutboundWireImpl.java
incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/injection/CallbackWireObjectFactory.java
incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/AbstractOutboundInvocationHandler.java
incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/InboundWireImpl.java
incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/OutboundWireImpl.java
incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/jdk/JDKCallbackInvocationHandler.java
incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/jdk/JDKOutboundInvocationHandler.java
incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/jdk/JDKWireService.java
incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/builder/ConnectorPostProcessTestCase.java
incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/implementation/java/AsyncJavaTargetInvokerTestCase.java
incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/implementation/java/integration/component/OneWayWireInvocationTestCase.java
incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/injection/CallbackWireObjectFactoryTestCase.java
incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/component/Component.java
incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/component/WorkContext.java
incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/extension/AtomicComponentExtension.java
incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/extension/CompositeComponentExtension.java
incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/InboundWire.java
incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/Message.java
incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/MessageImpl.java
incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/OutboundWire.java
incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/RuntimeWire.java
incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/WireService.java
Modified: incubator/tuscany/java/sca/containers/container.groovy/src/main/java/org/apache/tuscany/container/groovy/AsyncGroovyInvoker.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/containers/container.groovy/src/main/java/org/apache/tuscany/container/groovy/AsyncGroovyInvoker.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/containers/container.groovy/src/main/java/org/apache/tuscany/container/groovy/AsyncGroovyInvoker.java (original)
+++ incubator/tuscany/java/sca/containers/container.groovy/src/main/java/org/apache/tuscany/container/groovy/AsyncGroovyInvoker.java Thu Sep 7 07:59:22 2006
@@ -28,10 +28,10 @@
import org.apache.tuscany.spi.component.TargetException;
import org.apache.tuscany.spi.component.WorkContext;
import org.apache.tuscany.spi.services.work.WorkScheduler;
+import org.apache.tuscany.spi.wire.InboundWire;
import org.apache.tuscany.spi.wire.InvocationRuntimeException;
import org.apache.tuscany.spi.wire.Message;
import org.apache.tuscany.spi.wire.MessageChannel;
-import org.apache.tuscany.spi.wire.OutboundWire;
import org.apache.tuscany.spi.wire.TargetInvoker;
/**
@@ -44,11 +44,12 @@
private static final ContextBinder BINDER = new ContextBinder();
private static final Message RESPONSE = new AsyncGroovyInvoker.ImmutableMessage();
- private OutboundWire wire;
+ private InboundWire wire;
private WorkScheduler workScheduler;
private AsyncMonitor monitor;
private WorkContext workContext;
private Object target;
+ private Object messageId;
/**
* Creates a new invoker
@@ -61,7 +62,7 @@
* @param workContext
*/
public AsyncGroovyInvoker(String operation,
- OutboundWire wire,
+ InboundWire wire,
GroovyAtomicComponent component,
WorkScheduler workScheduler,
AsyncMonitor monitor,
@@ -82,8 +83,10 @@
// Schedule the invocation of the next interceptor in a new Work instance
try {
workScheduler.scheduleWork(new Runnable() {
+ private Object currentMessageId = messageId;
+
public void run() {
- workContext.setCurrentInvocationWire(wire);
+ workContext.setCurrentMessageId(currentMessageId);
CompositeContext oldContext = CurrentCompositeContext.getContext();
try {
BINDER.setContext(currentContext);
@@ -107,6 +110,8 @@
public Message invoke(Message msg) throws InvocationRuntimeException {
// can't just call overriden invoke because it would bypass async
try {
+ messageId = msg.getMessageId();
+ wire.addMapping(messageId, msg.getFromAddress());
return (Message) invokeTarget(msg.getBody());
} catch (InvocationTargetException e) {
// FIXME need to log exceptions
@@ -181,6 +186,30 @@
public Message getRelatedCallbackMessage() {
return null;
+ }
+
+ public Object getFromAddress() {
+ return null;
+ }
+
+ public void setFromAddress(Object fromAddress) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Object getMessageId() {
+ return null;
+ }
+
+ public void setMessageId(Object messageId) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Object getCorrelationId() {
+ return null;
+ }
+
+ public void setCorrelationId(Object correlationId) {
+ throw new UnsupportedOperationException();
}
}
}
Modified: incubator/tuscany/java/sca/containers/container.groovy/src/main/java/org/apache/tuscany/container/groovy/GroovyAtomicComponent.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/containers/container.groovy/src/main/java/org/apache/tuscany/container/groovy/GroovyAtomicComponent.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/containers/container.groovy/src/main/java/org/apache/tuscany/container/groovy/GroovyAtomicComponent.java (original)
+++ incubator/tuscany/java/sca/containers/container.groovy/src/main/java/org/apache/tuscany/container/groovy/GroovyAtomicComponent.java Thu Sep 7 07:59:22 2006
@@ -67,7 +67,7 @@
return new GroovyInvoker(operation.getName(), this);
}
- public TargetInvoker createAsyncTargetInvoker(OutboundWire wire, Operation operation) {
+ public TargetInvoker createAsyncTargetInvoker(InboundWire wire, Operation operation) {
return new AsyncGroovyInvoker(operation.getName(), wire, this, workScheduler, monitor, workContext);
}
Modified: incubator/tuscany/java/sca/containers/container.groovy/src/test/java/org/apache/tuscany/container/groovy/AsyncInvokerTestCase.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/containers/container.groovy/src/test/java/org/apache/tuscany/container/groovy/AsyncInvokerTestCase.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/containers/container.groovy/src/test/java/org/apache/tuscany/container/groovy/AsyncInvokerTestCase.java (original)
+++ incubator/tuscany/java/sca/containers/container.groovy/src/test/java/org/apache/tuscany/container/groovy/AsyncInvokerTestCase.java Thu Sep 7 07:59:22 2006
@@ -22,6 +22,7 @@
import org.apache.tuscany.spi.component.WorkContext;
import org.apache.tuscany.spi.services.work.WorkScheduler;
+import org.apache.tuscany.spi.wire.InboundWire;
import org.apache.tuscany.spi.wire.Message;
import org.apache.tuscany.spi.wire.MessageImpl;
@@ -66,7 +67,8 @@
WorkContext context = createMock(WorkContext.class);
Method method = AsyncTarget.class.getMethod("invoke");
method.setAccessible(true);
- AsyncGroovyInvoker invoker = new AsyncGroovyInvoker("invoke", null, component, scheduler, monitor, context);
+ InboundWire wire = createMock(InboundWire.class);
+ AsyncGroovyInvoker invoker = new AsyncGroovyInvoker("invoke", wire, component, scheduler, monitor, context);
Message msg = new MessageImpl();
invoker.invoke(msg);
verify(instance);
Modified: incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/builder/ConnectorImpl.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/builder/ConnectorImpl.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/builder/ConnectorImpl.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/builder/ConnectorImpl.java Thu Sep 7 07:59:22 2006
@@ -52,6 +52,7 @@
import org.apache.tuscany.core.wire.MessageChannelImpl;
import org.apache.tuscany.core.wire.MessageDispatcher;
import org.apache.tuscany.core.wire.OutboundAutowire;
+import org.apache.tuscany.core.wire.OutboundInvocationChainImpl;
/**
* The default connector implmentation
@@ -192,7 +193,7 @@
throw new ComponentRuntimeException("Operation cannot be marked one-way and have a callback");
}
if (isOneWayOperation || operationHasCallback) {
- invoker = component.createAsyncTargetInvoker(sourceWire, operation);
+ invoker = component.createAsyncTargetInvoker(targetWire, operation);
} else {
Operation<?> inboundOperation = inboundChain.getOperation();
invoker = component.createTargetInvoker(null, inboundOperation);
@@ -212,6 +213,31 @@
}
}
+ // create source callback chains and connect them if target callback chains exist
+ Map<Operation<?>, OutboundInvocationChain> sourceCallbackChains =
+ targetWire.getSourceCallbackInvocationChains(source.getName());
+ for (InboundInvocationChain inboundChain : sourceWire.getTargetCallbackInvocationChains().values()) {
+ Operation<?> operation = inboundChain.getOperation();
+ if (sourceCallbackChains != null && sourceCallbackChains.get(operation) != null) {
+ BuilderConfigException e =
+ new BuilderConfigException(
+ "Source callback chain should not exist for operation [" + operation.getName() + "]");
+ e.setIdentifier(sourceWire.getReferenceName());
+ throw e;
+ }
+ OutboundInvocationChain outboundChain = new OutboundInvocationChainImpl(operation);
+ targetWire.addSourceCallbackInvocationChain(source.getName(), operation, outboundChain);
+ if (source instanceof Component) {
+ Component<?> component = (Component<?>) source;
+ TargetInvoker invoker;
+ invoker = component.createTargetInvoker(null, operation);
+ connect(outboundChain, inboundChain, invoker);
+ } else if (target instanceof Service) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /*
// connect callback wires if they exist
for (OutboundInvocationChain outboundChain : sourceWire.getSourceCallbackInvocationChains().values()) {
// match wire chains
@@ -233,7 +259,7 @@
}
TargetInvoker invoker;
if (isOneWayOperation || operationHasCallback) {
- invoker = component.createAsyncTargetInvoker(sourceWire, operation);
+ invoker = component.createAsyncTargetInvoker(operation);
} else {
Operation<?> inboundOperation = inboundChain.getOperation();
invoker = component.createTargetInvoker(null, inboundOperation);
@@ -243,6 +269,7 @@
throw new UnsupportedOperationException();
}
}
+ */
}
public void connect(OutboundInvocationChain sourceChain,
Modified: incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/component/WorkContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/component/WorkContextImpl.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/component/WorkContextImpl.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/component/WorkContextImpl.java Thu Sep 7 07:59:22 2006
@@ -23,7 +23,6 @@
import org.apache.tuscany.spi.component.CompositeComponent;
import org.apache.tuscany.spi.component.WorkContext;
-import org.apache.tuscany.spi.wire.OutboundWire;
/**
* An implementation of an {@link org.apache.tuscany.spi.component.WorkContext} that handles event-to-thread
@@ -34,7 +33,7 @@
public class WorkContextImpl implements WorkContext {
private static final Object REMOTE_CONTEXT = new Object();
- private static final Object OUTBOUND_WIRE = new Object();
+ private static final Object MESSAGE_ID = new Object();
// TODO implement propagation strategy for creating new threads
@@ -46,21 +45,21 @@
super();
}
- public OutboundWire getCurrentInvocationWire() {
+ public Object getCurrentMessageId() {
Map<Object, Object> map = workContext.get();
if (map == null) {
return null;
}
- return (OutboundWire) map.get(OUTBOUND_WIRE);
+ return map.get(MESSAGE_ID);
}
- public void setCurrentInvocationWire(OutboundWire wire) {
+ public void setCurrentMessageId(Object messageId) {
Map<Object, Object> map = workContext.get();
if (map == null) {
map = new HashMap<Object, Object>();
workContext.set(map);
}
- map.put(OUTBOUND_WIRE, wire);
+ map.put(MESSAGE_ID, messageId);
}
public CompositeComponent getRemoteComponent() {
Modified: incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/composite/AbstractCompositeComponent.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/composite/AbstractCompositeComponent.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/composite/AbstractCompositeComponent.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/composite/AbstractCompositeComponent.java Thu Sep 7 07:59:22 2006
@@ -39,7 +39,7 @@
import org.apache.tuscany.spi.event.Event;
import org.apache.tuscany.spi.extension.CompositeComponentExtension;
import org.apache.tuscany.spi.model.Operation;
-import org.apache.tuscany.spi.wire.OutboundWire;
+import org.apache.tuscany.spi.wire.InboundWire;
import org.apache.tuscany.spi.wire.TargetInvoker;
import org.apache.tuscany.core.component.AutowireComponent;
@@ -233,7 +233,7 @@
return null;
}
- public TargetInvoker createAsyncTargetInvoker(OutboundWire wire, Operation operation) {
+ public TargetInvoker createAsyncTargetInvoker(InboundWire wire, Operation operation) {
return null;
}
Modified: incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/java/AsyncJavaTargetInvoker.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/java/AsyncJavaTargetInvoker.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/java/AsyncJavaTargetInvoker.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/java/AsyncJavaTargetInvoker.java Thu Sep 7 07:59:22 2006
@@ -27,11 +27,11 @@
import org.osoa.sca.ServiceRuntimeException;
import org.apache.tuscany.spi.services.work.WorkScheduler;
+import org.apache.tuscany.spi.wire.InboundWire;
import org.apache.tuscany.spi.wire.InvocationRuntimeException;
import org.apache.tuscany.spi.wire.Message;
import org.apache.tuscany.spi.wire.MessageChannel;
import org.apache.tuscany.spi.wire.TargetInvoker;
-import org.apache.tuscany.spi.wire.OutboundWire;
import org.apache.tuscany.spi.component.WorkContext;
import org.apache.tuscany.spi.component.TargetException;
@@ -48,11 +48,12 @@
private static final Message RESPONSE = new ImmutableMessage();
private JavaAtomicComponent component;
- private OutboundWire wire;
+ private InboundWire wire;
private WorkScheduler workScheduler;
private AsyncMonitor monitor;
private WorkContext workContext;
private Object target;
+ private Object messageId;
/**
* Creates a new invoker
@@ -65,7 +66,7 @@
* @param workContext
*/
public AsyncJavaTargetInvoker(Method operation,
- OutboundWire wire,
+ InboundWire wire,
JavaAtomicComponent component,
WorkScheduler workScheduler,
AsyncMonitor monitor,
@@ -87,12 +88,12 @@
// Schedule the invocation of the next interceptor in a new Work instance
try {
workScheduler.scheduleWork(new Runnable() {
+ private Object currentMessageId = messageId;
public void run() {
- workContext.setCurrentInvocationWire(wire);
+ workContext.setCurrentMessageId(currentMessageId);
CompositeContext oldContext = CurrentCompositeContext.getContext();
try {
BINDER.setContext(currentContext);
- // REVIEW response must be null for one-way and non-null for callback
AsyncJavaTargetInvoker.super.invokeTarget(payload);
} catch (Exception e) {
// REVIEW uncomment when it is available
@@ -112,6 +113,8 @@
public Message invoke(Message msg) throws InvocationRuntimeException {
// can't just call overriden invoke because it would bypass async
try {
+ messageId = msg.getMessageId();
+ wire.addMapping(messageId, msg.getFromAddress());
Object resp = invokeTarget(msg.getBody());
return (Message) resp;
} catch (InvocationTargetException e) {
@@ -188,6 +191,30 @@
public Message getRelatedCallbackMessage() {
return null;
+ }
+
+ public Object getFromAddress() {
+ return null;
+ }
+
+ public void setFromAddress(Object fromAddress) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Object getMessageId() {
+ return null;
+ }
+
+ public void setMessageId(Object messageId) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Object getCorrelationId() {
+ return null;
+ }
+
+ public void setCorrelationId(Object correlationId) {
+ throw new UnsupportedOperationException();
}
}
}
Modified: incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/java/JavaAtomicComponent.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/java/JavaAtomicComponent.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/java/JavaAtomicComponent.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/java/JavaAtomicComponent.java Thu Sep 7 07:59:22 2006
@@ -29,7 +29,6 @@
import org.apache.tuscany.spi.model.Operation;
import org.apache.tuscany.spi.model.ServiceContract;
import org.apache.tuscany.spi.wire.InboundWire;
-import org.apache.tuscany.spi.wire.OutboundWire;
import org.apache.tuscany.spi.wire.RuntimeWire;
import org.apache.tuscany.spi.wire.TargetInvoker;
@@ -80,12 +79,18 @@
}
public TargetInvoker createTargetInvoker(String targetName, Operation operation) {
- Method[] methods = operation.getServiceContract().getInterfaceClass().getMethods();
+ Method[] methods;
+ if (operation.isCallback()) {
+ methods = operation.getServiceContract().getCallbackClass().getMethods();
+
+ } else {
+ methods = operation.getServiceContract().getInterfaceClass().getMethods();
+ }
Method method = findMethod(operation, methods);
return new JavaTargetInvoker(method, this);
}
- public TargetInvoker createAsyncTargetInvoker(OutboundWire wire, Operation operation) {
+ public TargetInvoker createAsyncTargetInvoker(InboundWire wire, Operation operation) {
Method method;
if (operation.isCallback()) {
method = findMethod(operation, operation.getServiceContract().getCallbackClass().getMethods());
@@ -104,18 +109,19 @@
}
Member member = callbackSites.get(name);
if (member != null) {
- injectors.add(createCallbackInjector(member, wire.getServiceContract()));
+ injectors.add(createCallbackInjector(member, wire.getServiceContract(), wire));
}
}
- protected Injector<Object> createCallbackInjector(Member member, ServiceContract<?> contract) {
+ protected Injector<Object> createCallbackInjector(Member member, ServiceContract<?> contract,
+ InboundWire inboundWire) {
if (member instanceof Field) {
Field field = (Field) member;
- ObjectFactory<?> factory = new CallbackWireObjectFactory(contract, wireService);
+ ObjectFactory<?> factory = new CallbackWireObjectFactory(contract, wireService, inboundWire);
return new FieldInjector<Object>(field, factory);
} else if (member instanceof Method) {
Method method = (Method) member;
- ObjectFactory<?> factory = new CallbackWireObjectFactory(contract, wireService);
+ ObjectFactory<?> factory = new CallbackWireObjectFactory(contract, wireService, inboundWire);
return new MethodInjector<Object>(method, factory);
} else {
InvalidAccessorException e = new InvalidAccessorException("Member must be a field or method");
Modified: incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/component/SystemSingletonAtomicComponent.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/component/SystemSingletonAtomicComponent.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/component/SystemSingletonAtomicComponent.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/component/SystemSingletonAtomicComponent.java Thu Sep 7 07:59:22 2006
@@ -120,7 +120,7 @@
return null;
}
- public TargetInvoker createAsyncTargetInvoker(OutboundWire wire, Operation operation) {
+ public TargetInvoker createAsyncTargetInvoker(InboundWire wire, Operation operation) {
return null;
}
}
Modified: incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/wire/SystemInboundWireImpl.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/wire/SystemInboundWireImpl.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/wire/SystemInboundWireImpl.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/wire/SystemInboundWireImpl.java Thu Sep 7 07:59:22 2006
@@ -27,6 +27,7 @@
import org.apache.tuscany.spi.model.Operation;
import org.apache.tuscany.spi.model.ServiceContract;
import org.apache.tuscany.spi.wire.InboundInvocationChain;
+import org.apache.tuscany.spi.wire.OutboundInvocationChain;
import org.apache.tuscany.spi.wire.OutboundWire;
/**
@@ -39,6 +40,7 @@
private ServiceContract serviceContract;
private Component<?> component;
private SystemOutboundWire<T> wire;
+ private String containerName;
/**
* Constructs a new inbound wire
@@ -97,6 +99,21 @@
throw new UnsupportedOperationException();
}
+ public Map<Operation<?>, OutboundInvocationChain> getSourceCallbackInvocationChains(Object targetAddr) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void addSourceCallbackInvocationChains(Object targetAddr,
+ Map<Operation<?>,
+ OutboundInvocationChain> chains) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void addSourceCallbackInvocationChain(Object targetAddr, Operation operation,
+ OutboundInvocationChain chain) {
+ throw new UnsupportedOperationException();
+ }
+
public void addInterface(Class claz) {
throw new UnsupportedOperationException();
}
@@ -118,4 +135,23 @@
this.wire = (SystemOutboundWire<T>) wire;
}
+ public String getContainerName() {
+ return containerName;
+ }
+
+ public void setContainerName(String name) {
+ this.containerName = name;
+ }
+
+ public void addMapping(Object messageId, Object fromAddress) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Object retrieveMapping(Object messageId) {
+ return null;
+ }
+
+ public void removeMapping(Object messageId) {
+ throw new UnsupportedOperationException();
+ }
}
Modified: incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/wire/SystemOutboundAutowire.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/wire/SystemOutboundAutowire.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/wire/SystemOutboundAutowire.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/wire/SystemOutboundAutowire.java Thu Sep 7 07:59:22 2006
@@ -44,6 +44,7 @@
private ServiceContract serviceContract;
private AutowireComponent<?> component;
private final boolean required;
+ private String containerName;
public SystemOutboundAutowire(String referenceName, Class<T> businessInterface, AutowireComponent<?> component,
boolean required) {
@@ -133,23 +134,19 @@
throw new UnsupportedOperationException();
}
- public Map<Operation<?>, OutboundInvocationChain> getSourceCallbackInvocationChains() {
- return null;
- }
-
- public void addSourceCallbackInvocationChains(Map<Operation<?>, OutboundInvocationChain> chains) {
-
- }
-
- public void addSourceCallbackInvocationChain(Operation operation, OutboundInvocationChain chain) {
-
- }
-
public void addInterface(Class claz) {
throw new UnsupportedOperationException();
}
public boolean isOptimizable() {
return true;
+ }
+
+ public String getContainerName() {
+ return containerName;
+ }
+
+ public void setContainerName(String name) {
+ this.containerName = name;
}
}
Modified: incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/wire/SystemOutboundWireImpl.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/wire/SystemOutboundWireImpl.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/wire/SystemOutboundWireImpl.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/implementation/system/wire/SystemOutboundWireImpl.java Thu Sep 7 07:59:22 2006
@@ -40,6 +40,7 @@
private QualifiedName targetName;
private ServiceContract serviceContract;
private SystemInboundWire<T> targetWire;
+ private String containerName;
public SystemOutboundWireImpl(String referenceName, QualifiedName targetName, Class<T> businessInterface) {
this.referenceName = referenceName;
@@ -119,18 +120,6 @@
throw new UnsupportedOperationException();
}
- public Map<Operation<?>, OutboundInvocationChain> getSourceCallbackInvocationChains() {
- return null;
- }
-
- public void addSourceCallbackInvocationChains(Map<Operation<?>, OutboundInvocationChain> chains) {
- throw new UnsupportedOperationException();
- }
-
- public void addSourceCallbackInvocationChain(Operation operation, OutboundInvocationChain chain) {
- throw new UnsupportedOperationException();
- }
-
public void addInterface(Class claz) {
throw new UnsupportedOperationException();
}
@@ -142,6 +131,14 @@
public boolean isOptimizable() {
return true;
+ }
+
+ public String getContainerName() {
+ return containerName;
+ }
+
+ public void setContainerName(String name) {
+ this.containerName = name;
}
}
Modified: incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/injection/CallbackWireObjectFactory.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/injection/CallbackWireObjectFactory.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/injection/CallbackWireObjectFactory.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/injection/CallbackWireObjectFactory.java Thu Sep 7 07:59:22 2006
@@ -21,6 +21,7 @@
import org.apache.tuscany.spi.ObjectCreationException;
import org.apache.tuscany.spi.ObjectFactory;
import org.apache.tuscany.spi.model.ServiceContract;
+import org.apache.tuscany.spi.wire.InboundWire;
import org.apache.tuscany.spi.wire.WireService;
/**
@@ -32,14 +33,16 @@
private WireService wireService;
private ServiceContract<?> contract;
+ private InboundWire wire;
- public CallbackWireObjectFactory(ServiceContract<?> contract, WireService wireService) {
+ public CallbackWireObjectFactory(ServiceContract<?> contract, WireService wireService, InboundWire wire) {
this.contract = contract;
this.wireService = wireService;
+ this.wire = wire;
}
public Object getInstance() throws ObjectCreationException {
- return wireService.createCallbackProxy(contract);
+ return wireService.createCallbackProxy(contract, wire);
}
}
Modified: incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/AbstractOutboundInvocationHandler.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/AbstractOutboundInvocationHandler.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/AbstractOutboundInvocationHandler.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/AbstractOutboundInvocationHandler.java Thu Sep 7 07:59:22 2006
@@ -22,6 +22,7 @@
import org.apache.tuscany.spi.wire.Interceptor;
import org.apache.tuscany.spi.wire.Message;
+import org.apache.tuscany.spi.wire.MessageId;
import org.apache.tuscany.spi.wire.MessageImpl;
import org.apache.tuscany.spi.wire.OutboundInvocationChain;
import org.apache.tuscany.spi.wire.TargetInvoker;
@@ -52,6 +53,10 @@
} else {
Message msg = new MessageImpl();
msg.setTargetInvoker(invoker);
+ msg.setFromAddress(getFromAddress());
+ if (msg.getMessageId() == null) {
+ msg.setMessageId(new MessageId());
+ }
msg.setBody(args);
// dispatch the wire down the chain and get the response
if (chain.getTargetRequestChannel() != null) {
@@ -81,5 +86,5 @@
}
}
-
+ protected abstract Object getFromAddress();
}
Modified: incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/InboundWireImpl.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/InboundWireImpl.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/InboundWireImpl.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/InboundWireImpl.java Thu Sep 7 07:59:22 2006
@@ -28,6 +28,7 @@
import org.apache.tuscany.spi.wire.InboundWire;
import org.apache.tuscany.spi.wire.Interceptor;
import org.apache.tuscany.spi.wire.MessageHandler;
+import org.apache.tuscany.spi.wire.OutboundInvocationChain;
import org.apache.tuscany.spi.wire.OutboundWire;
/**
@@ -42,6 +43,10 @@
private OutboundWire<T> targetWire;
private String callbackReferenceName;
private Map<Operation<?>, InboundInvocationChain> chains = new HashMap<Operation<?>, InboundInvocationChain>();
+ private Map<Object, Map<Operation<?>, OutboundInvocationChain>> callbackSourceChainMaps =
+ new HashMap<Object, Map<Operation<?>, OutboundInvocationChain>>();
+ private String containerName;
+ private Map<Object, Object> msgIdsToAddrs = new HashMap<Object, Object>();
@SuppressWarnings("unchecked")
public T getTargetService() throws TargetException {
@@ -84,6 +89,25 @@
chains.put(operation, chain);
}
+ public Map<Operation<?>, OutboundInvocationChain> getSourceCallbackInvocationChains(Object targetAddr) {
+ return callbackSourceChainMaps.get(targetAddr);
+ }
+
+ public void addSourceCallbackInvocationChains(Object targetAddr,
+ Map<Operation<?>, OutboundInvocationChain> chains) {
+ callbackSourceChainMaps.put(targetAddr, chains);
+ }
+
+ public void addSourceCallbackInvocationChain(Object targetAddr, Operation operation,
+ OutboundInvocationChain chain) {
+ Map<Operation<?>, OutboundInvocationChain> chains = callbackSourceChainMaps.get(targetAddr);
+ if (chains == null) {
+ chains = new HashMap<Operation<?>, OutboundInvocationChain>();
+ callbackSourceChainMaps.put(targetAddr, chains);
+ }
+ chains.put(operation, chain);
+ }
+
public void setTargetWire(OutboundWire<T> wire) {
targetWire = wire;
}
@@ -128,4 +152,23 @@
return true;
}
+ public String getContainerName() {
+ return containerName;
+ }
+
+ public void setContainerName(String name) {
+ this.containerName = name;
+ }
+
+ public void addMapping(Object messageId, Object fromAddress) {
+ this.msgIdsToAddrs.put(messageId, fromAddress);
+ }
+
+ public Object retrieveMapping(Object messageId) {
+ return this.msgIdsToAddrs.get(messageId);
+ }
+
+ public void removeMapping(Object messageId) {
+ this.msgIdsToAddrs.remove(messageId);
+ }
}
Modified: incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/OutboundWireImpl.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/OutboundWireImpl.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/OutboundWireImpl.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/OutboundWireImpl.java Thu Sep 7 07:59:22 2006
@@ -44,11 +44,10 @@
private Map<Operation<?>, OutboundInvocationChain> chains = new HashMap<Operation<?>, OutboundInvocationChain>();
private Map<Operation<?>, InboundInvocationChain> callbackTargetChains =
new HashMap<Operation<?>, InboundInvocationChain>();
- private Map<Operation<?>, OutboundInvocationChain> callbackSourceChains =
- new HashMap<Operation<?>, OutboundInvocationChain>();
private String referenceName;
private QualifiedName targetName;
private InboundWire<T> targetWire;
+ private String containerName;
public T getTargetService() throws TargetException {
if (targetWire != null) {
@@ -115,18 +114,6 @@
callbackTargetChains.put(operation, chain);
}
- public Map<Operation<?>, OutboundInvocationChain> getSourceCallbackInvocationChains() {
- return callbackSourceChains;
- }
-
- public void addSourceCallbackInvocationChains(Map<Operation<?>, OutboundInvocationChain> chains) {
- callbackSourceChains.putAll(chains);
- }
-
- public void addSourceCallbackInvocationChain(Operation opeation, OutboundInvocationChain chain) {
- callbackSourceChains.put(opeation, chain);
- }
-
public String getReferenceName() {
return referenceName;
}
@@ -201,5 +188,13 @@
}
return true;
+ }
+
+ public String getContainerName() {
+ return containerName;
+ }
+
+ public void setContainerName(String name) {
+ this.containerName = name;
}
}
Modified: incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/jdk/JDKCallbackInvocationHandler.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/jdk/JDKCallbackInvocationHandler.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/jdk/JDKCallbackInvocationHandler.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/jdk/JDKCallbackInvocationHandler.java Thu Sep 7 07:59:22 2006
@@ -20,12 +20,13 @@
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
+import java.util.Map;
import org.apache.tuscany.spi.component.WorkContext;
import static org.apache.tuscany.spi.idl.java.JavaIDLUtils.findOperation;
import org.apache.tuscany.spi.model.Operation;
+import org.apache.tuscany.spi.wire.InboundWire;
import org.apache.tuscany.spi.wire.OutboundInvocationChain;
-import org.apache.tuscany.spi.wire.OutboundWire;
import org.apache.tuscany.spi.wire.TargetInvoker;
import org.apache.tuscany.spi.wire.WireInvocationHandler;
@@ -43,17 +44,25 @@
implements WireInvocationHandler, InvocationHandler {
private WorkContext context;
+ private InboundWire<?> inboundWire;
- public JDKCallbackInvocationHandler(WorkContext context) {
+ public JDKCallbackInvocationHandler(WorkContext context, InboundWire inboundWire) {
this.context = context;
+ this.inboundWire = inboundWire;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- OutboundWire<?> wire = context.getCurrentInvocationWire();
- context.setCurrentInvocationWire(null);
+ Object correlationId = context.getCurrentMessageId();
+ context.setCurrentMessageId(null);
+ Object targetAddress = inboundWire.retrieveMapping(correlationId);
+ if (targetAddress == null) {
+ throw new AssertionError("No from address associated with message id [" + correlationId + "]");
+ }
//TODO optimize as this is slow in local invocations
- Operation operation = findOperation(method, wire.getSourceCallbackInvocationChains().keySet());
- OutboundInvocationChain chain = wire.getSourceCallbackInvocationChains().get(operation);
+ Map<Operation<?>, OutboundInvocationChain> sourceCallbackInvocationChains =
+ inboundWire.getSourceCallbackInvocationChains(targetAddress);
+ Operation operation = findOperation(method, sourceCallbackInvocationChains.keySet());
+ OutboundInvocationChain chain = sourceCallbackInvocationChains.get(operation);
TargetInvoker invoker = chain.getTargetInvoker();
return invoke(chain, invoker, args);
}
@@ -61,5 +70,9 @@
public Object invoke(Method method, Object[] args) throws Throwable {
return invoke(null, method, args);
+ }
+
+ protected Object getFromAddress() {
+ return inboundWire.getContainerName();
}
}
Modified: incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/jdk/JDKOutboundInvocationHandler.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/jdk/JDKOutboundInvocationHandler.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/jdk/JDKOutboundInvocationHandler.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/jdk/JDKOutboundInvocationHandler.java Thu Sep 7 07:59:22 2006
@@ -50,10 +50,12 @@
* is not cacheable, the master associated with the wire chains will be used.
*/
private Map<Method, ChainHolder> chains;
+ private Object fromAddress;
public JDKOutboundInvocationHandler(OutboundWire<?> wire) throws NoMethodForOperationException {
Map<Operation<?>, OutboundInvocationChain> invocationChains = wire.getInvocationChains();
this.chains = new HashMap<Method, ChainHolder>(invocationChains.size());
+ this.fromAddress = wire.getContainerName();
Method[] methods = wire.getServiceContract().getInterfaceClass().getMethods();
// TODO optimize this
for (Map.Entry<Operation<?>, OutboundInvocationChain> entry : invocationChains.entrySet()) {
@@ -110,6 +112,10 @@
public Object invoke(Method method, Object[] args) throws Throwable {
return invoke(null, method, args);
+ }
+
+ protected Object getFromAddress() {
+ return fromAddress;
}
/**
Modified: incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/jdk/JDKWireService.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/jdk/JDKWireService.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/jdk/JDKWireService.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/main/java/org/apache/tuscany/core/wire/jdk/JDKWireService.java Thu Sep 7 07:59:22 2006
@@ -126,10 +126,10 @@
}
@SuppressWarnings("unchecked")
- public <T> T createCallbackProxy(ServiceContract<?> contract) throws ProxyCreationException {
+ public <T> T createCallbackProxy(ServiceContract<?> contract, InboundWire<?> wire) throws ProxyCreationException {
Class<T> interfaze = (Class<T>) contract.getCallbackClass();
ClassLoader cl = interfaze.getClassLoader();
- JDKCallbackInvocationHandler handler = new JDKCallbackInvocationHandler(context);
+ JDKCallbackInvocationHandler handler = new JDKCallbackInvocationHandler(context, wire);
return interfaze.cast(Proxy.newProxyInstance(cl, new Class[]{interfaze}, handler));
}
@@ -150,8 +150,8 @@
}
}
- public WireInvocationHandler createCallbackHandler() {
- return new JDKCallbackInvocationHandler(context);
+ public WireInvocationHandler createCallbackHandler(InboundWire<?> wire) {
+ return new JDKCallbackInvocationHandler(context, wire);
}
public OutboundInvocationChain createOutboundChain(Operation<?> operation) {
@@ -167,13 +167,16 @@
Implementation<?> implementation = definition.getImplementation();
ComponentType<?, ?, ?> componentType = implementation.getComponentType();
for (ServiceDefinition service : componentType.getServices().values()) {
- component.addInboundWire(createWire(service));
+ InboundWire inboundWire = createWire(service);
+ inboundWire.setContainerName(component.getName());
+ component.addInboundWire(inboundWire);
}
for (ReferenceTarget reference : definition.getReferenceTargets().values()) {
Map<String, ? extends ReferenceDefinition> references = componentType.getReferences();
ReferenceDefinition mappedReference = references.get(reference.getReferenceName());
OutboundWire wire = createWire(reference, mappedReference);
+ wire.setContainerName(component.getName());
component.addOutboundWire(wire);
if (componentType instanceof CompositeComponentType<?, ?, ?>) {
// If this is the case, then it means that component has already been returned
@@ -190,6 +193,7 @@
public <T> void createWires(Reference<T> reference, ServiceContract<?> contract) {
InboundWire<T> wire = new InboundWireImpl<T>();
wire.setServiceContract(contract);
+ wire.setContainerName(reference.getName());
for (Operation<?> operation : contract.getOperations().values()) {
InboundInvocationChain chain = createInboundChain(operation);
chain.addInterceptor(new InvokerInterceptor());
@@ -229,12 +233,10 @@
wire.setCallbackInterface(callbackInterface);
for (Operation<?> operation : contract.getCallbacksOperations().values()) {
InboundInvocationChain callbackTargetChain = createInboundChain(operation);
- OutboundInvocationChain callbackSourceChain = createOutboundChain(operation);
// TODO handle policy
//TODO statement below could be cleaner
callbackTargetChain.addInterceptor(new InvokerInterceptor());
wire.addTargetCallbackInvocationChain(operation, callbackTargetChain);
- wire.addSourceCallbackInvocationChain(operation, callbackSourceChain);
}
}
return wire;
@@ -263,8 +265,10 @@
InboundWire<T> inboundWire = new InboundWireImpl<T>();
OutboundWire<T> outboundWire = new OutboundWireImpl<T>();
inboundWire.setServiceContract(contract);
+ inboundWire.setContainerName(service.getName());
outboundWire.setServiceContract(contract);
outboundWire.setTargetName(new QualifiedName(targetName));
+ outboundWire.setContainerName(service.getName());
for (Operation<?> operation : contract.getOperations().values()) {
InboundInvocationChain inboundChain = createInboundChain(operation);
inboundWire.addInvocationChain(operation, inboundChain);
Modified: incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/builder/ConnectorPostProcessTestCase.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/builder/ConnectorPostProcessTestCase.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/builder/ConnectorPostProcessTestCase.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/builder/ConnectorPostProcessTestCase.java Thu Sep 7 07:59:22 2006
@@ -23,6 +23,7 @@
import java.util.Collections;
import org.apache.tuscany.spi.builder.WirePostProcessorRegistry;
+import org.apache.tuscany.spi.component.Component;
import org.apache.tuscany.spi.wire.InboundWire;
import org.apache.tuscany.spi.wire.OutboundWire;
@@ -56,16 +57,20 @@
@SuppressWarnings("unchecked")
public void testOutboundToInboundPostProcessCalled() throws Exception {
OutboundWire owire = createNiceMock(OutboundWire.class);
- expect(owire.getSourceCallbackInvocationChains()).andReturn(Collections.emptyMap());
expect(owire.getInvocationChains()).andReturn(Collections.emptyMap());
+ expect(owire.getTargetCallbackInvocationChains()).andReturn(Collections.emptyMap());
replay(owire);
InboundWire iwire = createNiceMock(InboundWire.class);
+ expect(iwire.getSourceCallbackInvocationChains("Component")).andReturn(Collections.emptyMap());
replay(iwire);
WirePostProcessorRegistry registry = createMock(WirePostProcessorRegistry.class);
registry.process(EasyMock.eq(owire), EasyMock.eq(iwire));
replay(registry);
+ Component source = createNiceMock(Component.class);
+ expect(source.getName()).andReturn("Component");
+ replay(source);
ConnectorImpl connector = new ConnectorImpl(registry);
- connector.connect(null, null, owire, iwire, false);
+ connector.connect(source, null, owire, iwire, false);
verify(registry);
}
Modified: incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/implementation/java/AsyncJavaTargetInvokerTestCase.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/implementation/java/AsyncJavaTargetInvokerTestCase.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/implementation/java/AsyncJavaTargetInvokerTestCase.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/implementation/java/AsyncJavaTargetInvokerTestCase.java Thu Sep 7 07:59:22 2006
@@ -22,9 +22,9 @@
import org.apache.tuscany.spi.component.WorkContext;
import org.apache.tuscany.spi.services.work.WorkScheduler;
+import org.apache.tuscany.spi.wire.InboundWire;
import org.apache.tuscany.spi.wire.Message;
import org.apache.tuscany.spi.wire.MessageImpl;
-import org.apache.tuscany.spi.wire.OutboundWire;
import junit.framework.TestCase;
import static org.apache.tuscany.core.implementation.java.mock.MockFactory.createJavaComponent;
@@ -64,8 +64,9 @@
WorkContext context = createMock(WorkContext.class);
Method method = AsyncTarget.class.getMethod("invoke");
method.setAccessible(true);
+ InboundWire wire = createMock(InboundWire.class);
AsyncJavaTargetInvoker invoker =
- new AsyncJavaTargetInvoker(method, null, component, scheduler, monitor, context);
+ new AsyncJavaTargetInvoker(method, wire, component, scheduler, monitor, context);
Message msg = new MessageImpl();
invoker.invoke(msg);
verify(target);
@@ -90,10 +91,7 @@
});
replay(scheduler);
WorkContext context = createMock(WorkContext.class);
- context.setCurrentInvocationWire(isA(OutboundWire.class));
- expectLastCall().once();
- replay(context);
- OutboundWire wire = createMock(OutboundWire.class);
+ InboundWire wire = createMock(InboundWire.class);
Method method = AsyncTarget.class.getMethod("invoke");
method.setAccessible(true);
AsyncJavaTargetInvoker invoker =
Modified: incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/implementation/java/integration/component/OneWayWireInvocationTestCase.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/implementation/java/integration/component/OneWayWireInvocationTestCase.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/implementation/java/integration/component/OneWayWireInvocationTestCase.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/implementation/java/integration/component/OneWayWireInvocationTestCase.java Thu Sep 7 07:59:22 2006
@@ -72,7 +72,9 @@
WorkContext context = createMock(WorkContext.class);
Method method = AsyncTarget.class.getMethod("invoke");
method.setAccessible(true);
- AsyncJavaTargetInvoker invoker = new AsyncJavaTargetInvoker(method, null, component, scheduler, null, context);
+ InboundWire inboundWire = createMock(InboundWire.class);
+ AsyncJavaTargetInvoker invoker =
+ new AsyncJavaTargetInvoker(method, inboundWire, component, scheduler, null, context);
InboundWire<AsyncTarget> wire =
createServiceWire("foo", AsyncTarget.class, null, null, null);
Map<Operation<?>, InboundInvocationChain> chains = wire.getInvocationChains();
Modified: incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/injection/CallbackWireObjectFactoryTestCase.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/injection/CallbackWireObjectFactoryTestCase.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/injection/CallbackWireObjectFactoryTestCase.java (original)
+++ incubator/tuscany/java/sca/kernel/core/src/test/java/org/apache/tuscany/core/injection/CallbackWireObjectFactoryTestCase.java Thu Sep 7 07:59:22 2006
@@ -36,10 +36,10 @@
JavaServiceContract contract = new JavaServiceContract();
contract.setCallbackClass(Foo.class);
WireService service = createMock(WireService.class);
- service.createCallbackProxy(contract);
+ service.createCallbackProxy(contract, null);
expectLastCall().andReturn(null);
replay(service);
- CallbackWireObjectFactory factory = new CallbackWireObjectFactory(contract, service);
+ CallbackWireObjectFactory factory = new CallbackWireObjectFactory(contract, service, null);
factory.getInstance();
verify(service);
}
Modified: incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/component/Component.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/component/Component.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/component/Component.java (original)
+++ incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/component/Component.java Thu Sep 7 07:59:22 2006
@@ -91,9 +91,8 @@
* Callback to create a {@link org.apache.tuscany.spi.wire.TargetInvoker} which dispatches to a service offered by
* the component
*
- * @param wire the wire associated with the callback
* @param operation the operation to invoke
*/
- TargetInvoker createAsyncTargetInvoker(OutboundWire wire, Operation operation);
+ TargetInvoker createAsyncTargetInvoker(InboundWire wire, Operation operation);
}
Modified: incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/component/WorkContext.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/component/WorkContext.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/component/WorkContext.java (original)
+++ incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/component/WorkContext.java Thu Sep 7 07:59:22 2006
@@ -18,8 +18,6 @@
*/
package org.apache.tuscany.spi.component;
-import org.apache.tuscany.spi.wire.OutboundWire;
-
/**
* Implementations track information associated with a request as it is processed by the runtime
*
@@ -27,10 +25,10 @@
*/
public interface WorkContext {
- OutboundWire getCurrentInvocationWire();
+ Object getCurrentMessageId();
+
+ void setCurrentMessageId(Object messageId);
- void setCurrentInvocationWire(OutboundWire wire);
-
/**
* Returns the composite where a remote request came in
*/
Modified: incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/extension/AtomicComponentExtension.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/extension/AtomicComponentExtension.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/extension/AtomicComponentExtension.java (original)
+++ incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/extension/AtomicComponentExtension.java Thu Sep 7 07:59:22 2006
@@ -132,7 +132,7 @@
onReferenceWires(multiplicityClass, wires);
}
- public TargetInvoker createAsyncTargetInvoker(OutboundWire wire, Operation operation) {
+ public TargetInvoker createAsyncTargetInvoker(InboundWire wire, Operation operation) {
throw new UnsupportedOperationException();
}
Modified: incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/extension/CompositeComponentExtension.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/extension/CompositeComponentExtension.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/extension/CompositeComponentExtension.java (original)
+++ incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/extension/CompositeComponentExtension.java Thu Sep 7 07:59:22 2006
@@ -209,7 +209,7 @@
}
}
- public TargetInvoker createAsyncTargetInvoker(OutboundWire wire, Operation operation) {
+ public TargetInvoker createAsyncTargetInvoker(InboundWire wire, Operation operation) {
throw new UnsupportedOperationException();
}
Modified: incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/InboundWire.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/InboundWire.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/InboundWire.java (original)
+++ incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/InboundWire.java Thu Sep 7 07:59:22 2006
@@ -59,6 +59,22 @@
void addInvocationChain(Operation<?> operation, InboundInvocationChain chain);
/**
+ * Returns the callback invocation configuration for each operation on a service specified by a reference or a
+ * target service.
+ */
+ Map<Operation<?>, OutboundInvocationChain> getSourceCallbackInvocationChains(Object targetAddr);
+
+ /**
+ * Adds the collection of callback invocation chains keyed by operation for a given target addr
+ */
+ void addSourceCallbackInvocationChains(Object targetAddr, Map<Operation<?>, OutboundInvocationChain> chains);
+
+ /**
+ * Adds the callback invocation chain associated with the given operation for a given target addr
+ */
+ void addSourceCallbackInvocationChain(Object targetAddr, Operation<?> operation, OutboundInvocationChain chain);
+
+ /**
* Returns the name of the callback associated with the service of the wire
*/
String getCallbackReferenceName();
@@ -73,5 +89,20 @@
*/
void setTargetWire(OutboundWire<T> wire);
+ /**
+ * Creates an association between a message id and the address of the SCAObject that the corresponding
+ * message originates from
+ */
+ void addMapping(Object messageId, Object fromAddress);
+
+ /**
+ * Retrieves the SCAObject address that is associated with a message id
+ */
+ Object retrieveMapping(Object messageId);
+ /**
+ * Removes an association between a message id and the address of the SCAObject that the corresponding
+ * message originates from
+ */
+ void removeMapping(Object messageId);
}
Modified: incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/Message.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/Message.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/Message.java (original)
+++ incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/Message.java Thu Sep 7 07:59:22 2006
@@ -54,4 +54,34 @@
*
*/
Message getRelatedCallbackMessage();
+
+ /**
+ * Returns the 'address' of the SCAObject where this message originates
+ */
+ Object getFromAddress();
+
+ /**
+ * Sets the 'address' of the SCAObject where this message originates
+ */
+ void setFromAddress(Object fromAddress);
+
+ /**
+ * Returns the id of the message
+ */
+ Object getMessageId();
+
+ /**
+ * Sets the id of the message
+ */
+ void setMessageId(Object messageId);
+
+ /**
+ * Returns the correlation id of the message
+ */
+ Object getCorrelationId();
+
+ /**
+ * Sets the correlation id of the message
+ */
+ void setCorrelationId(Object correlationId);
}
Added: incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/MessageId.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/MessageId.java?view=auto&rev=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/MessageId.java (added)
+++ incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/MessageId.java Thu Sep 7 07:59:22 2006
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.spi.wire;
+
+/**
+ * A unique identifier for a message flowing on a wire, potentially end-to-end (ie, through more than one SCAObject to
+ * SCAObject hop).
+ */
+public class MessageId {
+
+ private long timestamp;
+
+ public MessageId() {
+ this.timestamp = System.currentTimeMillis();
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+}
Propchange: incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/MessageId.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/MessageId.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/MessageImpl.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/MessageImpl.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/MessageImpl.java (original)
+++ incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/MessageImpl.java Thu Sep 7 07:59:22 2006
@@ -28,6 +28,9 @@
private Object body;
private Message relatedCallbackMessage;
private TargetInvoker invoker;
+ private Object fromAddress;
+ private Object messageId;
+ private Object correlationId;
public MessageImpl() {
}
@@ -58,5 +61,29 @@
public TargetInvoker getTargetInvoker() {
return invoker;
+ }
+
+ public Object getFromAddress() {
+ return fromAddress;
+ }
+
+ public void setFromAddress(Object fromAddress) {
+ this.fromAddress = fromAddress;
+ }
+
+ public Object getMessageId() {
+ return messageId;
+ }
+
+ public void setMessageId(Object messageId) {
+ this.messageId = messageId;
+ }
+
+ public Object getCorrelationId() {
+ return correlationId;
+ }
+
+ public void setCorrelationId(Object correlationId) {
+ this.correlationId = correlationId;
}
}
Modified: incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/OutboundWire.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/OutboundWire.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/OutboundWire.java (original)
+++ incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/OutboundWire.java Thu Sep 7 07:59:22 2006
@@ -95,22 +95,6 @@
void addTargetCallbackInvocationChain(Operation<?> operation, InboundInvocationChain chain);
/**
- * Returns the callback invocation configuration for each operation on a service specified by a reference or a
- * target service.
- */
- Map<Operation<?>, OutboundInvocationChain> getSourceCallbackInvocationChains();
-
- /**
- * Adds the collection of callback invocation chains keyed by operation
- */
- void addSourceCallbackInvocationChains(Map<Operation<?>, OutboundInvocationChain> chains);
-
- /**
- * Adds the callback invocation chain associated with the given operation
- */
- void addSourceCallbackInvocationChain(Operation<?> operation, OutboundInvocationChain chain);
-
- /**
* Set when a wire can be optimized; that is when no handlers or interceptors exist on either end
*/
void setTargetWire(InboundWire<T> wire);
Modified: incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/RuntimeWire.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/RuntimeWire.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/RuntimeWire.java (original)
+++ incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/RuntimeWire.java Thu Sep 7 07:59:22 2006
@@ -57,4 +57,13 @@
*/
boolean isOptimizable();
+ /**
+ * Returns the name of the SCAObject that contains this wire
+ */
+ String getContainerName();
+
+ /**
+ * Sets the name of the SCAObject that contains this wire
+ */
+ void setContainerName(String name);
}
Modified: incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/WireService.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/WireService.java?view=diff&rev=441106&r1=441105&r2=441106
==============================================================================
--- incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/WireService.java (original)
+++ incubator/tuscany/java/sca/kernel/spi/src/main/java/org/apache/tuscany/spi/wire/WireService.java Thu Sep 7 07:59:22 2006
@@ -54,7 +54,7 @@
* @return the proxy
* @throws ProxyCreationException
*/
- <T> T createCallbackProxy(ServiceContract<?> contract) throws ProxyCreationException;
+ <T> T createCallbackProxy(ServiceContract<?> contract, InboundWire<?> wire) throws ProxyCreationException;
/**
@@ -70,7 +70,7 @@
*
* @return the invocation handler for flowing invocations through a callback
*/
- WireInvocationHandler createCallbackHandler();
+ WireInvocationHandler createCallbackHandler(InboundWire<?> wire);
/**
* Creates an outbound invocation chain for a given operation
---------------------------------------------------------------------
To unsubscribe, e-mail: tuscany-commits-unsubscribe@ws.apache.org
For additional commands, e-mail: tuscany-commits-help@ws.apache.org