You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuscany.apache.org by lr...@apache.org on 2007/04/15 07:53:10 UTC
svn commit: r528930 - in
/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core:
bootstrap/ deployer/ runtime/ work/
Author: lresende
Date: Sat Apr 14 22:53:09 2007
New Revision: 528930
URL: http://svn.apache.org/viewvc?view=rev&rev=528930
Log:
Adding support for nonBlockingInterceptor
Added:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkEvent.java (with props)
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkItem.java (with props)
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237Work.java (with props)
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237WorkScheduler.java (with props)
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/ThreadPoolWorkManager.java (with props)
Modified:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/bootstrap/DefaultBootstrapper.java
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/deployer/DeployerImpl.java
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/runtime/AbstractRuntime.java
Modified: incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/bootstrap/DefaultBootstrapper.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/bootstrap/DefaultBootstrapper.java?view=diff&rev=528930&r1=528929&r2=528930
==============================================================================
--- incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/bootstrap/DefaultBootstrapper.java (original)
+++ incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/bootstrap/DefaultBootstrapper.java Sat Apr 14 22:53:09 2007
@@ -23,6 +23,7 @@
import org.apache.tuscany.core.builder.BuilderRegistryImpl;
import org.apache.tuscany.core.builder.WirePostProcessorRegistryImpl;
import org.apache.tuscany.core.component.ComponentManagerImpl;
+import org.apache.tuscany.core.component.WorkContextImpl;
import org.apache.tuscany.core.component.scope.AbstractScopeContainer;
import org.apache.tuscany.core.component.scope.CompositeScopeContainer;
import org.apache.tuscany.core.component.scope.RequestScopeContainer;
@@ -30,15 +31,21 @@
import org.apache.tuscany.core.component.scope.StatelessScopeContainer;
import org.apache.tuscany.core.deployer.DeployerImpl;
import org.apache.tuscany.core.implementation.composite.CompositeBuilder;
+import org.apache.tuscany.core.work.Jsr237WorkScheduler;
+import org.apache.tuscany.core.work.ThreadPoolWorkManager;
import org.apache.tuscany.host.MonitorFactory;
import org.apache.tuscany.spi.bootstrap.ExtensionPointRegistry;
import org.apache.tuscany.spi.builder.BuilderRegistry;
import org.apache.tuscany.spi.component.ComponentManager;
import org.apache.tuscany.spi.component.ScopeContainerMonitor;
import org.apache.tuscany.spi.component.ScopeRegistry;
+import org.apache.tuscany.spi.component.WorkContext;
import org.apache.tuscany.spi.deployer.Deployer;
+import org.apache.tuscany.spi.services.work.WorkScheduler;
import org.apache.tuscany.spi.wire.WirePostProcessorRegistry;
+import commonj.work.WorkManager;
+
/**
* A default implementation of a Bootstrapper. Please see the documentation on
* the individual methods for how the primordial components are created.
@@ -95,7 +102,10 @@
public Deployer createDeployer(ExtensionPointRegistry extensionRegistry) {
ScopeRegistry scopeRegistry = getScopeRegistry();
BuilderRegistry builder = createBuilder(scopeRegistry);
- DeployerImpl deployer = new DeployerImpl(xmlFactory, builder, componentManager);
+ WorkContext workContext = new WorkContextImpl();
+ WorkManager workManager = new ThreadPoolWorkManager(10);
+ WorkScheduler workScheduler = new Jsr237WorkScheduler(workManager);
+ DeployerImpl deployer = new DeployerImpl(xmlFactory, builder, componentManager, workScheduler, workContext);
deployer.setScopeRegistry(getScopeRegistry());
WirePostProcessorRegistry wirePostProcessorRegistry = new WirePostProcessorRegistryImpl();
deployer.setWirePostProcessorRegistry(wirePostProcessorRegistry);
Modified: incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/deployer/DeployerImpl.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/deployer/DeployerImpl.java?view=diff&rev=528930&r1=528929&r2=528930
==============================================================================
--- incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/deployer/DeployerImpl.java (original)
+++ incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/deployer/DeployerImpl.java Sat Apr 14 22:53:09 2007
@@ -31,7 +31,6 @@
import org.apache.tuscany.assembly.Composite;
import org.apache.tuscany.assembly.CompositeReference;
import org.apache.tuscany.assembly.CompositeService;
-import org.apache.tuscany.assembly.Implementation;
import org.apache.tuscany.assembly.Multiplicity;
import org.apache.tuscany.assembly.SCABinding;
import org.apache.tuscany.assembly.impl.DefaultAssemblyFactory;
@@ -40,6 +39,7 @@
import org.apache.tuscany.core.builder.WireCreationException;
import org.apache.tuscany.core.wire.InvocationChainImpl;
import org.apache.tuscany.core.wire.InvokerInterceptor;
+import org.apache.tuscany.core.wire.NonBlockingInterceptor;
import org.apache.tuscany.core.wire.WireImpl;
import org.apache.tuscany.core.wire.WireUtils;
import org.apache.tuscany.interfacedef.IncompatibleInterfaceContractException;
@@ -66,10 +66,11 @@
import org.apache.tuscany.spi.component.Service;
import org.apache.tuscany.spi.component.ServiceBinding;
import org.apache.tuscany.spi.component.TargetInvokerCreationException;
+import org.apache.tuscany.spi.component.WorkContext;
import org.apache.tuscany.spi.deployer.Deployer;
import org.apache.tuscany.spi.deployer.DeploymentContext;
import org.apache.tuscany.spi.resolver.ResolutionException;
-import org.apache.tuscany.spi.util.UriHelper;
+import org.apache.tuscany.spi.services.work.WorkScheduler;
import org.apache.tuscany.spi.wire.InvocationChain;
import org.apache.tuscany.spi.wire.Wire;
import org.apache.tuscany.spi.wire.WirePostProcessorRegistry;
@@ -86,11 +87,15 @@
private ScopeRegistry scopeRegistry;
private InterfaceContractMapper mapper = new DefaultInterfaceContractMapper();
private WirePostProcessorRegistry postProcessorRegistry;
+ private WorkScheduler workScheduler;
+ private WorkContext workContext;
- public DeployerImpl(XMLInputFactory xmlFactory, Builder builder, ComponentManager componentManager) {
+ public DeployerImpl(XMLInputFactory xmlFactory, Builder builder, ComponentManager componentManager, WorkScheduler workScheduler, WorkContext workContext) {
this.xmlFactory = xmlFactory;
this.builder = builder;
this.componentManager = componentManager;
+ this.workScheduler = workScheduler;
+ this.workContext = workContext;
}
public DeployerImpl() {
@@ -343,10 +348,10 @@
for (Operation operation : sourceContract.getInterface().getOperations()) {
Operation targetOperation = mapper.map(targetContract.getInterface(), operation);
InvocationChain chain = new InvocationChainImpl(operation, targetOperation);
- /*
- * if (operation.isNonBlocking()) { chain.addInterceptor(new
- * NonBlockingInterceptor(scheduler, workContext)); }
- */
+ /* lresende */
+ if (operation.isNonBlocking()) {
+ chain.addInterceptor(new NonBlockingInterceptor(workScheduler, workContext)); }
+ /* lresende */
chain.addInterceptor(new InvokerInterceptor());
wire.addInvocationChain(chain);
@@ -355,10 +360,10 @@
for (Operation operation : sourceContract.getCallbackInterface().getOperations()) {
Operation targetOperation = mapper.map(targetContract.getCallbackInterface(), operation);
InvocationChain chain = new InvocationChainImpl(operation, targetOperation);
- /*
- * if (operation.isNonBlocking()) { chain.addInterceptor(new
- * NonBlockingInterceptor(scheduler, workContext)); }
- */
+ /* lresende */
+ if (operation.isNonBlocking()) {
+ chain.addInterceptor(new NonBlockingInterceptor(workScheduler, workContext)); }
+ /* lresende */
chain.addInterceptor(new InvokerInterceptor());
wire.addCallbackInvocationChain(chain);
}
Modified: incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/runtime/AbstractRuntime.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/runtime/AbstractRuntime.java?view=diff&rev=528930&r1=528929&r2=528930
==============================================================================
--- incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/runtime/AbstractRuntime.java (original)
+++ incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/runtime/AbstractRuntime.java Sat Apr 14 22:53:09 2007
@@ -45,10 +45,12 @@
import org.apache.tuscany.core.bootstrap.Bootstrapper;
import org.apache.tuscany.core.bootstrap.DefaultBootstrapper;
import org.apache.tuscany.core.component.ComponentManagerImpl;
-import org.apache.tuscany.core.component.SimpleWorkContext;
+import org.apache.tuscany.core.component.WorkContextImpl;
import org.apache.tuscany.core.monitor.NullMonitorFactory;
import org.apache.tuscany.core.services.classloading.ClassLoaderRegistryImpl;
import org.apache.tuscany.core.util.IOHelper;
+import org.apache.tuscany.core.work.Jsr237WorkScheduler;
+import org.apache.tuscany.core.work.ThreadPoolWorkManager;
import org.apache.tuscany.host.MonitorFactory;
import org.apache.tuscany.host.RuntimeInfo;
import org.apache.tuscany.host.management.ManagementService;
@@ -70,8 +72,11 @@
import org.apache.tuscany.spi.deployer.Deployer;
import org.apache.tuscany.spi.services.classloading.ClassLoaderRegistry;
import org.apache.tuscany.spi.services.management.TuscanyManagementService;
+import org.apache.tuscany.spi.services.work.WorkScheduler;
import org.osoa.sca.ComponentContext;
+import commonj.work.WorkManager;
+
/**
* @version $Rev$ $Date$
*/
@@ -83,6 +88,8 @@
private static final URI SCOPE_REGISTRY_URI = TUSCANY_SYSTEM_ROOT.resolve("ScopeRegistry");
private static final URI WORK_CONTEXT_URI = TUSCANY_SYSTEM.resolve("WorkContext");
+
+ private static final URI WORK_SCHEDULER_URI = TUSCANY_SYSTEM.resolve("WorkScheduler");
private static final URI RUNTIME_INFO_URI = TUSCANY_SYSTEM_ROOT.resolve("RuntimeInfo");
@@ -211,7 +218,9 @@
extensionRegistry.addExtensionPoint(ContributionService.class, contributionService);
registerSystemComponent(TUSCANY_DEPLOYER, Deployer.class, deployer);
- registerSystemComponent(WORK_CONTEXT_URI, WorkContext.class, new SimpleWorkContext());
+ registerSystemComponent(WORK_CONTEXT_URI, WorkContext.class, new WorkContextImpl());
+ WorkManager workManager = new ThreadPoolWorkManager(10);
+ registerSystemComponent(WORK_SCHEDULER_URI, WorkScheduler.class, new Jsr237WorkScheduler(workManager)); //lresende
this.scopeRegistry = bootstrapper.getScopeRegistry();
Added: incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkEvent.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkEvent.java?view=auto&rev=528930
==============================================================================
--- incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkEvent.java (added)
+++ incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkEvent.java Sat Apr 14 22:53:09 2007
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.core.work;
+
+
+import commonj.work.WorkEvent;
+import commonj.work.WorkException;
+import commonj.work.WorkItem;
+
+/**
+ * Default immutable implementation of the <code>WorkEvent</code> class.
+ */
+class DefaultWorkEvent implements WorkEvent {
+
+ // Work item for this event
+ private WorkItem workItem;
+
+ // Exception if something has gone wrong
+ private WorkException exception;
+
+ /**
+ * Instantiates the event.
+ *
+ * @param workItem Work item for this event.
+ */
+ public DefaultWorkEvent(final DefaultWorkItem workItem) {
+ this.workItem = workItem;
+ this.exception = workItem.getException();
+ }
+
+ /**
+ * Returns the work type based on whether the work was accepted, started,
+ * rejected or completed.
+ *
+ * @return Work type.
+ */
+ public int getType() {
+ return workItem.getStatus();
+ }
+
+ /**
+ * Returns the work item associated with this work type.
+ *
+ * @return Work item.
+ */
+ public WorkItem getWorkItem() {
+ return workItem;
+ }
+
+ /**
+ * Returns the exception if the work completed with an exception.
+ *
+ * @return Work exception.
+ */
+ public WorkException getException() {
+ return exception;
+ }
+}
Propchange: incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkItem.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkItem.java?view=auto&rev=528930
==============================================================================
--- incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkItem.java (added)
+++ incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkItem.java Sat Apr 14 22:53:09 2007
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.core.work;
+
+import commonj.work.Work;
+import commonj.work.WorkException;
+import commonj.work.WorkItem;
+
+/**
+ * An identity based immutable implementation of the <code>WorkItem</code>
+ * interface.
+ *
+ */
+class DefaultWorkItem implements WorkItem {
+
+ // Id scoped for the VM
+ private String id;
+
+ // Status
+ private int status = -1;
+
+ // Result
+ private Work result;
+
+ // Original work
+ private Work originalWork;
+
+ // Exception
+ private WorkException exception;
+
+ /**
+ * Instantiates an id for this item.
+ *
+ * @param id of this work event.
+ */
+ protected DefaultWorkItem(final String id, final Work orginalWork) {
+ this.id = id;
+ this.originalWork = orginalWork;
+ }
+
+ /**
+ * Returns the id.
+ *
+ * @return Id of this item.
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * Returns the original work.
+ *
+ * @return Original work.
+ */
+ public Work getOriginalWork() {
+ return originalWork;
+ }
+
+ /**
+ * Returns the work result if the work completed.
+ *
+ * @return Work.
+ * @throws WorkException If the work completed with an exception.
+ */
+ public Work getResult() throws WorkException {
+ return result;
+ }
+
+ /**
+ * Sets the result.
+ *
+ * @param result Result.
+ */
+ protected void setResult(final Work result) {
+ this.result = result;
+ }
+
+ /**
+ * Returns the exception if work completed with an exception.
+ *
+ * @return Work exception.
+ */
+ protected WorkException getException() {
+ return exception;
+ }
+
+ /**
+ * Sets the exception.
+ *
+ * @param exception Exception.
+ */
+ protected void setException(final WorkException exception) {
+ this.exception = exception;
+ }
+
+ /**
+ * Returns the work type based on whether the work was accepted, started,
+ * rejected or completed.
+ *
+ * @return Work status.
+ */
+ public int getStatus() {
+ return status;
+ }
+
+ /**
+ * Sets the status.
+ *
+ * @param status Status.
+ */
+ protected void setStatus(final int status) {
+ this.status = status;
+ }
+
+ /**
+ * @see Object#hashCode()
+ */
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ /**
+ * Indicates whether some other object is "equal to" this one.
+ *
+ * @param obj Object to be compared.
+ * @return true if this object is the same as the obj argument; false
+ * otherwise..
+ */
+ public boolean equals(final Object obj) {
+ return (obj != null) && (obj.getClass() == DefaultWorkItem.class) && ((DefaultWorkItem) obj).id.equals(id);
+ }
+
+ /**
+ * Compares this object with the specified object for order. Returns a
+ * negative integer, zero, or a positive integer as this object is less
+ * than, equal to, or greater than the specified object.
+ *
+ * @param o Object to be compared.
+ * @return A negative integer, zero, or a positive integer as this object
+ * is less than, equal to, or greater than the specified object.
+ * @throws ClassCastException needs better documentation.
+ */
+ public int compareTo(final Object o) {
+ if (o.getClass() != DefaultWorkItem.class) {
+ throw new ClassCastException(o.getClass().getName());
+ } else {
+ return ((DefaultWorkItem) o).getId().compareTo(getId());
+ }
+ }
+}
Propchange: incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/DefaultWorkItem.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237Work.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237Work.java?view=auto&rev=528930
==============================================================================
--- incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237Work.java (added)
+++ incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237Work.java Sat Apr 14 22:53:09 2007
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.core.work;
+
+/*
+ * JCA work wrapper.
+ */
+public class Jsr237Work<T extends Runnable> implements commonj.work.Work {
+
+ // Work that is being executed.
+ private T work;
+
+ /*
+ * Initializes the work instance.
+ */
+ public Jsr237Work(T work) {
+ this.work = work;
+ }
+
+ /*
+ * Returns the completed work.
+ */
+ public T getWork() {
+ return work;
+ }
+
+ /*
+ * Release the work.
+ */
+ public void release() {
+ }
+
+ /*
+ * Work attributes are not daemon.
+ */
+ public boolean isDaemon() {
+ return false;
+ }
+
+ /*
+ * Runs the work.
+ */
+ public void run() {
+ work.run();
+ }
+}
\ No newline at end of file
Propchange: incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237Work.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237WorkScheduler.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237WorkScheduler.java?view=auto&rev=528930
==============================================================================
--- incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237WorkScheduler.java (added)
+++ incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237WorkScheduler.java Sat Apr 14 22:53:09 2007
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.core.work;
+
+import org.apache.tuscany.spi.services.work.NotificationListener;
+import org.apache.tuscany.spi.services.work.WorkScheduler;
+import org.apache.tuscany.spi.services.work.WorkSchedulerException;
+
+import commonj.work.WorkEvent;
+import commonj.work.WorkException;
+import commonj.work.WorkListener;
+import commonj.work.WorkManager;
+import commonj.work.WorkRejectedException;
+
+/**
+ * A work scheduler implementation based on a JSR 237 work manager.
+ * <p/>
+ * <p/>
+ * This needs a JSR 237 work manager implementation available for scheduling work. Instances can be configured with a
+ * work manager implementation that is injected in. It is the responsibility of the runtime environment to make a work
+ * manager implementaion available. For example, if the managed environment supports work manager the runtime can use
+ * the appropriate lookup mechanism to inject the work manager implementation. </p>
+ */
+public class Jsr237WorkScheduler implements WorkScheduler {
+
+ /**
+ * Underlying JSR-237 work manager
+ */
+ private WorkManager jsr237WorkManager;
+
+ /**
+ * Initializes the JSR 237 work manager.
+ *
+ * @param jsr237WorkManager JSR 237 work manager.
+ */
+ public Jsr237WorkScheduler(WorkManager jsr237WorkManager) {
+ if (jsr237WorkManager == null) {
+ throw new IllegalArgumentException("Work manager cannot be null");
+ }
+ this.jsr237WorkManager = jsr237WorkManager;
+ }
+
+ /**
+ * Schedules a unit of work for future execution. The notification listener is used to register interest in
+ * callbacks regarding the status of the work.
+ *
+ * @param work The unit of work that needs to be asynchronously executed.
+ */
+ public <T extends Runnable> void scheduleWork(T work) {
+ scheduleWork(work, null);
+ }
+
+ /**
+ * Schedules a unit of work for future execution. The notification listener is used to register interest in
+ * callbacks regarding the status of the work.
+ *
+ * @param work The unit of work that needs to be asynchronously executed.
+ * @param listener Notification listener for callbacks.
+ */
+ public <T extends Runnable> void scheduleWork(T work, NotificationListener<T> listener) {
+
+ if (work == null) {
+ throw new IllegalArgumentException("Work cannot be null");
+ }
+
+ Jsr237Work<T> jsr237Work = new Jsr237Work<T>(work);
+ try {
+ if (listener == null) {
+ jsr237WorkManager.schedule(jsr237Work);
+ } else {
+ Jsr237WorkListener<T> jsr237WorkListener = new Jsr237WorkListener<T>(listener, work);
+ jsr237WorkManager.schedule(jsr237Work, jsr237WorkListener);
+ }
+ } catch (WorkRejectedException ex) {
+ if (listener != null) {
+ listener.workRejected(work);
+ } else {
+ throw new WorkSchedulerException(ex);
+ }
+ } catch (WorkException ex) {
+ throw new WorkSchedulerException(ex);
+ }
+
+ }
+
+ /*
+ * Worklistener for keeping track of work status callbacks.
+ *
+ */
+ private class Jsr237WorkListener<T extends Runnable> implements WorkListener {
+
+ // Notification listener
+ private NotificationListener<T> listener;
+
+ // Work
+ private T work;
+
+ /*
+ * Initializes the notification listener.
+ */
+ public Jsr237WorkListener(NotificationListener<T> listener, T work) {
+ this.listener = listener;
+ this.work = work;
+ }
+
+ /*
+ * Callback when the work is accepted.
+ */
+ public void workAccepted(WorkEvent workEvent) {
+ T work = getWork();
+ listener.workAccepted(work);
+ }
+
+ /*
+ * Callback when the work is rejected.
+ */
+ public void workRejected(WorkEvent workEvent) {
+ T work = getWork();
+ listener.workRejected(work);
+ }
+
+ /*
+ * Callback when the work is started.
+ */
+ public void workStarted(WorkEvent workEvent) {
+ T work = getWork();
+ listener.workStarted(work);
+ }
+
+ /*
+ * Callback when the work is completed.
+ */
+ public void workCompleted(WorkEvent workEvent) {
+ T work = getWork();
+ Exception exception = workEvent.getException();
+ if (exception != null) {
+ listener.workFailed(work, exception);
+ } else {
+ listener.workCompleted(work);
+ }
+ }
+
+ /*
+ * Gets the underlying work from the work event.
+ */
+ private T getWork() {
+ return work;
+ }
+
+ }
+}
Propchange: incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/Jsr237WorkScheduler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/ThreadPoolWorkManager.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/ThreadPoolWorkManager.java?view=auto&rev=528930
==============================================================================
--- incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/ThreadPoolWorkManager.java (added)
+++ incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/ThreadPoolWorkManager.java Sat Apr 14 22:53:09 2007
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.core.work;
+
+import java.rmi.server.UID;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.osoa.sca.annotations.Destroy;
+import org.osoa.sca.annotations.Property;
+
+import commonj.work.Work;
+import commonj.work.WorkEvent;
+import commonj.work.WorkException;
+import commonj.work.WorkItem;
+import commonj.work.WorkListener;
+import commonj.work.WorkManager;
+import commonj.work.WorkRejectedException;
+
+/**
+ * A thread-pool based implementation for the JSR-237 work manager.
+ * <p/>
+ * <p/>
+ * This implementation supports only local work.
+ * <p/>
+ * TODO Elaborate the implementation. </p>
+ */
+public class ThreadPoolWorkManager implements WorkManager {
+
+ // Map of work items currently handled by the work manager
+ private Map<DefaultWorkItem, WorkListener> workItems = new ConcurrentHashMap<DefaultWorkItem, WorkListener>();
+
+ // Thread-pool
+ private ExecutorService executor;
+
+ /**
+ * Initializes the thread-pool.
+ *
+ * @param threadPoolSize Thread-pool size.
+ */
+ public ThreadPoolWorkManager(@Property(name = "poolSize") int threadPoolSize) {
+ executor = Executors.newFixedThreadPool(threadPoolSize);
+ }
+
+ /**
+ * Schedules a unit of work asynchronously.
+ *
+ * @param work Work that needs to be scheduled.
+ * @return Work Work item representing the asynchronous work
+ */
+ public WorkItem schedule(Work work) throws WorkException {
+ return schedule(work, null);
+ }
+
+ /**
+ * Schedules a unit of work asynchronously.
+ *
+ * @param work Work that needs to be scheduled.
+ * @param workListener Work listener for callbacks.
+ * @return Work Work item representing the asynchronous work
+ */
+ public WorkItem schedule(Work work, WorkListener workListener) throws WorkRejectedException {
+
+ DefaultWorkItem workItem = new DefaultWorkItem(new UID().toString(), work);
+ if (workListener != null) {
+ workItems.put(workItem, workListener);
+ }
+ workAccepted(workItem, work);
+ if (scheduleWork(work, workItem)) {
+ return workItem;
+ } else {
+ workItem.setStatus(WorkEvent.WORK_REJECTED);
+ if (workListener != null) {
+ workListener.workRejected(new DefaultWorkEvent(workItem));
+ }
+ throw new WorkRejectedException("Unable to schedule work");
+ }
+ }
+
+ /**
+ * Wait for all the specified units of work to finish.
+ *
+ * @param works Units of the work that need to finish.
+ * @param timeout Timeout for waiting for the units of work to finish.
+ */
+ public boolean waitForAll(Collection works, long timeout) {
+ throw new UnsupportedOperationException("waitForAll not supported");
+ }
+
+ /**
+ * Wait for any of the specified units of work to finish.
+ *
+ * @param works Units of the work that need to finish.
+ * @param timeout Timeout for waiting for the units of work to finish.
+ */
+ public Collection waitForAny(Collection works, long timeout) {
+ throw new UnsupportedOperationException("waitForAny not supported");
+ }
+
+ /**
+ * Method provided for subclasses to indicate a work accptance.
+ *
+ * @param workItem Work item representing the work that was accepted.
+ * @param work Work that was accepted.
+ */
+ private void workAccepted(final DefaultWorkItem workItem, final Work work) {
+ WorkListener listener = workItems.get(workItem);
+ if (listener != null) {
+ workItem.setStatus(WorkEvent.WORK_ACCEPTED);
+ WorkEvent event = new DefaultWorkEvent(workItem);
+ listener.workAccepted(event);
+ }
+ }
+
+ /*
+ * Method to indicate a work start.
+ */
+ private void workStarted(final DefaultWorkItem workItem, final Work work) {
+ WorkListener listener = workItems.get(workItem);
+ if (listener != null) {
+ workItem.setStatus(WorkEvent.WORK_STARTED);
+ WorkEvent event = new DefaultWorkEvent(workItem);
+ listener.workStarted(event);
+ }
+ }
+
+ /*
+ * Method to indicate a work completion.
+ */
+ private void workCompleted(final DefaultWorkItem workItem, final Work work) {
+ workCompleted(workItem, work, null);
+ }
+
+ /*
+ * Method to indicate a work completion.
+ */
+ private void workCompleted(final DefaultWorkItem workItem, final Work work, final WorkException exception) {
+ WorkListener listener = workItems.get(workItem);
+ if (listener != null) {
+ workItem.setStatus(WorkEvent.WORK_COMPLETED);
+ workItem.setResult(work);
+ workItem.setException(exception);
+ WorkEvent event = new DefaultWorkEvent(workItem);
+ listener.workCompleted(event);
+ workItems.remove(workItem);
+ }
+ }
+
+ /*
+ * Schedules the work using the threadpool.
+ */
+ private boolean scheduleWork(final Work work, final DefaultWorkItem workItem) {
+ try {
+ executor.execute(new DecoratingWork(workItem, work));
+ return true;
+ } catch (RejectedExecutionException ex) {
+ return false;
+ }
+ }
+
+ /*
+ * Class that decorates the original worker so that it can get callbacks when work is done.
+ */
+ private final class DecoratingWork implements Runnable {
+
+ // Work item for this work.
+ private DefaultWorkItem workItem;
+
+ // The original work.
+ private Work decoratedWork;
+
+ /*
+ * Initializes the work item and underlying work.
+ */
+ private DecoratingWork(final DefaultWorkItem workItem, final Work decoratedWork) {
+ this.workItem = workItem;
+ this.decoratedWork = decoratedWork;
+ }
+
+ /*
+ * Overrides the run method.
+ */
+ public void run() {
+ workStarted(workItem, decoratedWork);
+ try {
+ decoratedWork.run();
+ workCompleted(workItem, decoratedWork);
+ } catch (Throwable th) {
+ workCompleted(workItem, decoratedWork, new WorkException(th.getMessage(), th));
+ }
+ }
+
+ }
+
+ @Destroy
+ public void destroy() {
+ executor.shutdown();
+ }
+
+}
Propchange: incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/core/work/ThreadPoolWorkManager.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: tuscany-commits-unsubscribe@ws.apache.org
For additional commands, e-mail: tuscany-commits-help@ws.apache.org