You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by gt...@apache.org on 2014/02/18 04:06:52 UTC
[6/7] git commit: ContextualWorkManager has been changed over to
present a ScheduledExecutorService for applications to use,
rather than a WorkManager.
ContextualWorkManager has been changed over to present a ScheduledExecutorService for applications to use, rather than a WorkManager.
Starting up the Admin service now works. AdminService is not yet fully implemented.
Project: http://git-wip-us.apache.org/repos/asf/river-container/repo
Commit: http://git-wip-us.apache.org/repos/asf/river-container/commit/e6828db5
Tree: http://git-wip-us.apache.org/repos/asf/river-container/tree/e6828db5
Diff: http://git-wip-us.apache.org/repos/asf/river-container/diff/e6828db5
Branch: refs/heads/plugin-work
Commit: e6828db5b36e56d6eef5c40f1b9cd3e79f2eb93f
Parents: 317dac0
Author: Greg Trasuk <gt...@apache.org>
Authored: Mon Jan 27 01:50:17 2014 -0500
Committer: Greg Trasuk <gt...@apache.org>
Committed: Mon Jan 27 01:50:17 2014 -0500
----------------------------------------------------------------------
.../river/container/admin/impl/AdminImpl.java | 19 +-
.../org/apache/river/container/Bootstrap.java | 4 +
.../org/apache/river/container/Strings.java | 2 +
.../deployer/StarterServiceDeployer.java | 42 ++--
.../river/container/work/BasicExecutor.java | 242 +++++++++++++++++++
.../container/work/ContextualWorkManager.java | 25 +-
.../river/container/work/WorkingContext.java | 4 +-
.../src/site/markdown/WorkManager.md | 60 +++++
river-container-core/src/site/markdown/index.md | 5 +-
.../work/ContextualWorkManagerTest.java | 16 +-
10 files changed, 376 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/admin-svc/admin-svc-impl/src/main/java/org/apache/river/container/admin/impl/AdminImpl.java
----------------------------------------------------------------------
diff --git a/admin-svc/admin-svc-impl/src/main/java/org/apache/river/container/admin/impl/AdminImpl.java b/admin-svc/admin-svc-impl/src/main/java/org/apache/river/container/admin/impl/AdminImpl.java
index ca0de09..b137e94 100644
--- a/admin-svc/admin-svc-impl/src/main/java/org/apache/river/container/admin/impl/AdminImpl.java
+++ b/admin-svc/admin-svc-impl/src/main/java/org/apache/river/container/admin/impl/AdminImpl.java
@@ -20,9 +20,8 @@ package org.apache.river.container.admin.impl;
import com.sun.jini.config.Config;
import com.sun.jini.start.LifeCycle;
import java.io.IOException;
-import java.net.SocketPermission;
import java.rmi.server.ExportException;
-import java.security.AccessController;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.jini.config.Configuration;
@@ -60,16 +59,13 @@ public class AdminImpl implements ServiceIDListener, AdminRemote {
JoinManager joinManager = null;
DiscoveryManagement discoveryManager = null;
Entry[] attributes = null;
-
+ ScheduledExecutorService executor=null;
+
public AdminImpl(String args[], final LifeCycle lc) throws ConfigurationException, ExportException, IOException {
config = ConfigurationProvider.getInstance(args);
// Get the exporter and create our proxy.
exporter = (Exporter) Config.getNonNullEntry(config, COMPONENT_ID, "exporter", Exporter.class);
- log.fine("\n");
- org.apache.river.container.Utils.logClassLoaderHierarchy(log, this.getClass());
- org.apache.river.container.Utils.logClassLoaderHierarchy(log, config.getClass());
- log.fine("\n");
Utils.logGrantsToClass(log, Level.FINE, this.getClass());
try {
myProxy = (Admin) exporter.export(this);
@@ -83,6 +79,15 @@ public class AdminImpl implements ServiceIDListener, AdminRemote {
// We don't have to do anything with it - just creating it starts the join process.
joinManager = new JoinManager(myProxy, attributes, this, discoveryManager, null, config);
log.info("Started the admin service");
+
+ /* For local clients, we don't want to be dependent on the Jini infrastructure being setup
+ correctly. For this reason, we stash a copy of the proxy's MarshalledObject in the local
+ file system.
+ */
+ synchronized(this) {
+ executor=(ScheduledExecutorService) Config.getNonNullEntry(config, COMPONENT_ID, "$executor", ScheduledExecutorService.class);
+ }
+
}
ServiceID sid = null;
http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/main/java/org/apache/river/container/Bootstrap.java
----------------------------------------------------------------------
diff --git a/river-container-core/src/main/java/org/apache/river/container/Bootstrap.java b/river-container-core/src/main/java/org/apache/river/container/Bootstrap.java
index 0943d4c..a7860a2 100644
--- a/river-container-core/src/main/java/org/apache/river/container/Bootstrap.java
+++ b/river-container-core/src/main/java/org/apache/river/container/Bootstrap.java
@@ -173,6 +173,10 @@ public class Bootstrap {
Method initCompleteMethod = context.getClass().getMethod(Strings.INIT_COMPLETE, new Class[0]);
Thread.currentThread().setContextClassLoader(containerClassLoader);
putByNameMethod.invoke(context, Strings.CLASS_LOADERS, (Object) classLoaders);
+
+ /* Store a link to the context in the context. */
+ putByNameMethod.invoke(context, Strings.CONTEXT, context);
+
/*
Process the core configuration
*/
http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/main/java/org/apache/river/container/Strings.java
----------------------------------------------------------------------
diff --git a/river-container-core/src/main/java/org/apache/river/container/Strings.java b/river-container-core/src/main/java/org/apache/river/container/Strings.java
index 4bbc2be..76510b3 100644
--- a/river-container-core/src/main/java/org/apache/river/container/Strings.java
+++ b/river-container-core/src/main/java/org/apache/river/container/Strings.java
@@ -32,6 +32,7 @@ public class Strings {
CORE_CONFIG_XML="core-config.xml",
CONTAINER_CLASS_LOADER="containerClassLoader",
CONTAINER_JMX_DOMAIN="org.apache.river.container",
+ CONTEXT="context",
CONTEXT_CLASS = "org.apache.river.container.Context",
DASH = "-",
DEFAULT = "default",
@@ -45,6 +46,7 @@ public class Strings {
DOT_PROPERTIES=".properties",
DOT_SSAR=".ssar",
EMPTY = "",
+ EXECUTOR_NAME="scheduledExecutorService",
GET_ADMIN="getAdmin",
FILE_UTILITY="fileUtility",
INIT_COMPLETE="initComplete",
http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/main/java/org/apache/river/container/deployer/StarterServiceDeployer.java
----------------------------------------------------------------------
diff --git a/river-container-core/src/main/java/org/apache/river/container/deployer/StarterServiceDeployer.java b/river-container-core/src/main/java/org/apache/river/container/deployer/StarterServiceDeployer.java
index 54578dc..500bf90 100644
--- a/river-container-core/src/main/java/org/apache/river/container/deployer/StarterServiceDeployer.java
+++ b/river-container-core/src/main/java/org/apache/river/container/deployer/StarterServiceDeployer.java
@@ -137,9 +137,9 @@ public class StarterServiceDeployer implements StarterServiceDeployerMXBean {
String parentLoaderName = configNode.search(
new Class[]{ASTconfig.class, ASTclassloader.class, ASTparent.class}).get(0).jjtGetChild(0).toString();
log.log(Level.FINE, MessageNames.SERVICE_PARENT_CLASSLOADER_IS, parentLoaderName);
- boolean isAppPriority=false;
- if (!configNode.search( new Class[]{ ASTconfig.class, ASTclassloader.class, ASTappPriority.class}).isEmpty()) {
- isAppPriority=true;
+ boolean isAppPriority = false;
+ if (!configNode.search(new Class[]{ASTconfig.class, ASTclassloader.class, ASTappPriority.class}).isEmpty()) {
+ isAppPriority = true;
}
ClassLoader parentLoader = (ClassLoader) context.get(parentLoaderName);
VirtualFileSystemClassLoader cl
@@ -248,7 +248,7 @@ public class StarterServiceDeployer implements StarterServiceDeployerMXBean {
} else {
throw new UnsupportedOperationException();
}
- env.getWorkingContext().getWorkManager().queueTask(env.getClassLoader(), task);
+ env.getWorkingContext().getScheduledExecutorService().submit(task);
}
public Properties readStartProperties(FileObject serviceRoot) throws FileSystemException, LocalizedRuntimeException, IOException {
@@ -267,26 +267,29 @@ public class StarterServiceDeployer implements StarterServiceDeployerMXBean {
return startProps;
}
- public void setupLiaisonConfiguration(FileObject serviceArchive, FileObject serviceRoot, VirtualFileSystemClassLoader cl) throws ConfigurationException {
+ public void setupLiaisonConfiguration(ApplicationEnvironment env) throws ConfigurationException {
/*
Setup the liaison configuration.
*/
ClassLoader originalContextCl = Thread.currentThread().getContextClassLoader();
try {
- Thread.currentThread().setContextClassLoader(cl);
+ Thread.currentThread().setContextClassLoader(env.getClassLoader());
File workingDir = null;
- if (serviceArchive != null) {
- workingDir = new File(serviceArchive.getURL().toURI());
+ if (env.getServiceArchive() != null) {
+ /* TODO: Is this right? Shouldn't the working directory be created
+ by the file manager under the 'work' dir?
+ */
+ workingDir = new File(env.getServiceArchive().getURL().toURI());
} else {
- workingDir = new File(serviceRoot.getURL().toURI());
+ workingDir = new File(env.getServiceArchive().getURL().toURI());
}
- grantPermissions(cl,
+ grantPermissions(env.getClassLoader(),
new Permission[]{new FilePermission(workingDir.getAbsolutePath(), Strings.READ)});
Utils.logClassLoaderHierarchy(log, Level.FINE, this.getClass());
String configName = VirtualFileSystemConfiguration.class.getName();
- invokeStatic(cl, configName,
+ invokeStatic(env.getClassLoader(), configName,
Strings.SET_WORKING_DIRECTORY,
workingDir);
/*
@@ -299,7 +302,7 @@ public class StarterServiceDeployer implements StarterServiceDeployerMXBean {
String contextVarName = cfgEntryNode.jjtGetChild(1).toString();
Object contextValue = context.get(contextVarName);
if (contextValue != null) {
- invokeStatic(cl, configName,
+ invokeStatic(env.getClassLoader(), configName,
Strings.PUT_SPECIAL_ENTRY,
new Class[]{String.class, Object.class},
Strings.DOLLAR + varName, contextValue);
@@ -308,6 +311,14 @@ public class StarterServiceDeployer implements StarterServiceDeployerMXBean {
new Object[]{getConfig(), varName, contextVarName});
}
}
+ /* Install the Executor. */
+ invokeStatic(env.getClassLoader(), configName,
+ Strings.PUT_SPECIAL_ENTRY,
+ new Class[]{String.class, Object.class
+ },
+ Strings.DOLLAR + Strings.EXECUTOR_NAME, env.getWorkingContext().getScheduledExecutorService()
+ );
+
} catch (Exception ex) {
log.log(Level.WARNING, MessageNames.EXCEPTION_THROWN, Utils.stackTrace(ex));
throw new ConfigurationException(ex,
@@ -385,12 +396,13 @@ public class StarterServiceDeployer implements StarterServiceDeployerMXBean {
*/
Permission[] perms = createPermissionsInClassloader(cl);
grantPermissions(cl, perms);
- setupLiaisonConfiguration(env.getServiceArchive(), env.getServiceRoot(), cl);
-
+
/*
* Create a working context (work manager).
*/
- env.setWorkingContext(contextualWorkManager.createContext(env.getServiceName()));
+ env.setWorkingContext(contextualWorkManager.createContext(env.getServiceName(), env.getClassLoader()));
+
+ setupLiaisonConfiguration(env);
}
void launchService(ApplicationEnvironment env, String[] serviceArgs) throws FileSystemException, IOException, ClassNotFoundException {
http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/main/java/org/apache/river/container/work/BasicExecutor.java
----------------------------------------------------------------------
diff --git a/river-container-core/src/main/java/org/apache/river/container/work/BasicExecutor.java b/river-container-core/src/main/java/org/apache/river/container/work/BasicExecutor.java
new file mode 100644
index 0000000..c3a996e
--- /dev/null
+++ b/river-container-core/src/main/java/org/apache/river/container/work/BasicExecutor.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+
+ */
+package org.apache.river.container.work;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.river.container.Init;
+import org.apache.river.container.MessageNames;
+import org.apache.river.container.Shutdown;
+import org.apache.river.container.Strings;
+
+/**
+ *
+ * A Basic implementation of WorkManager that runs the work threads through a
+ * ThreadPoolExecutor.
+ *
+ * @author trasukg
+ */
+public class BasicExecutor implements ScheduledExecutorService {
+
+ private static final Logger log = Logger.getLogger(BasicExecutor.class.getName(), MessageNames.BUNDLE_NAME);
+ ExecutorService executor = null;
+ ScheduledExecutorService scheduledExecutor=null;
+ private MyThreadFactory threadFactory = null;
+ private String name = Strings.UNNAMED;
+ private ClassLoader contextLoader;
+
+ public BasicExecutor(ClassLoader contextLoader) {
+ this(contextLoader, Strings.UNNAMED);
+ }
+
+ public BasicExecutor(ClassLoader contextLoader, String name) {
+ this.contextLoader=contextLoader;
+ this.name = name;
+ threadFactory = new MyThreadFactory();
+ executor = Executors.newCachedThreadPool(threadFactory);
+ scheduledExecutor=
+ Executors.newSingleThreadScheduledExecutor(threadFactory);
+ }
+
+ synchronized int getActiveCount() {
+ return threadFactory.threadGroup.activeCount();
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ return scheduledExecutor.schedule(new TaskRunnable(command, classLoaderToUse()), delay, unit);
+ }
+
+ private class TaskRunnable implements Runnable {
+
+ Runnable task = null;
+ ClassLoader contextClassLoader = null;
+ ClassLoader originalClassLoader = null;
+
+ TaskRunnable(Runnable task, ClassLoader contextClassLoader) {
+ this.task = task;
+ this.contextClassLoader = contextClassLoader;
+ }
+
+ @Override
+ public void run() {
+ originalClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ try {
+ task.run();
+ } finally {
+ Thread.currentThread().setContextClassLoader(originalClassLoader);
+ }
+ }
+ }
+
+ private class TaskCallable<T> implements Callable<T> {
+
+ Callable<T> task = null;
+ ClassLoader contextClassLoader = null;
+ ClassLoader originalClassLoader = null;
+
+ TaskCallable(Callable<T> task, ClassLoader contextClassLoader) {
+ this.task = task;
+ this.contextClassLoader = contextClassLoader;
+ }
+
+ @Override
+ public T call() throws Exception {
+ originalClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ try {
+ return task.call();
+ } finally {
+ Thread.currentThread().setContextClassLoader(originalClassLoader);
+ }
+ }
+ }
+
+ @Init
+ public void init() {
+ log.info(MessageNames.BASIC_WORK_MANAGER_INITIALIZED);
+ }
+
+ @Shutdown
+ public void shutdown() {
+ executor.shutdownNow();
+ scheduledExecutor.shutdownNow();
+ }
+
+ private class MyThreadFactory implements ThreadFactory {
+
+ private ThreadGroup threadGroup = new ThreadGroup(name);
+ private int index = 0;
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(threadGroup, r);
+ t.setName(name + Strings.DASH + index++);
+ log.log(Level.FINE, MessageNames.CREATED_THREAD,
+ new Object[]{t.getName(), t.getThreadGroup().getName()});
+ return t;
+ }
+ }
+
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+ return scheduledExecutor.schedule(new TaskCallable(callable, classLoaderToUse()), delay, unit);
+ }
+
+ private ClassLoader classLoaderToUse() {
+ ClassLoader classLoaderToUse =
+ contextLoader != null ? contextLoader : Thread.currentThread().getContextClassLoader();
+ return classLoaderToUse;
+ }
+
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return scheduledExecutor.scheduleAtFixedRate(new TaskRunnable(command, classLoaderToUse()), initialDelay, period, unit);
+ }
+
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return scheduledExecutor.scheduleWithFixedDelay(new TaskRunnable(command, classLoaderToUse()), initialDelay, delay, unit);
+ }
+
+ public List<Runnable> shutdownNow() {
+ List<Runnable> neverCommenced=new ArrayList<Runnable>();
+ neverCommenced.addAll(scheduledExecutor.shutdownNow());
+ neverCommenced.addAll(executor.shutdownNow());
+ return neverCommenced;
+ }
+
+ public boolean isShutdown() {
+ return scheduledExecutor.isShutdown() & executor.isShutdown();
+ }
+
+ public boolean isTerminated() {
+ return scheduledExecutor.isTerminated() & executor.isTerminated() & getActiveCount()==0;
+ }
+
+ /**
+ * Await termination. Note that this implementation doesn't make any guarantees
+ * about accuracy of the termination wait time, but it will be bounded at 2*timeout.
+ * @param timeout
+ * @param unit
+ * @return
+ * @throws InterruptedException
+ */
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return executor.awaitTermination(timeout, unit) & scheduledExecutor.awaitTermination(timeout, unit);
+ }
+
+ public <T> Future<T> submit(Callable<T> task) {
+ return executor.submit(new TaskCallable(task, classLoaderToUse()));
+ }
+
+ public <T> Future<T> submit(Runnable task, T result) {
+ return executor.submit(new TaskRunnable(task, classLoaderToUse()), result);
+ }
+
+ public Future<?> submit(Runnable task) {
+ return executor.submit(new TaskRunnable(task, classLoaderToUse()));
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ List<Callable<T>> wrappedTasks = constructListOfWrappedTasks(tasks);
+ return executor.invokeAll(wrappedTasks);
+ }
+
+ private <T> List<Callable<T>> constructListOfWrappedTasks(Collection<? extends Callable<T>> tasks) {
+ /* Construct a list of wrapped tasks. */
+ List<Callable<T>> wrappedTasks=new ArrayList<Callable<T>>(tasks.size());
+ for (Callable<T> task: tasks) {
+ wrappedTasks.add(new TaskCallable(task, classLoaderToUse()));
+ }
+ return wrappedTasks;
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ List<Callable<T>> wrappedTasks = constructListOfWrappedTasks(tasks);
+ return executor.invokeAll(wrappedTasks, timeout, unit);
+ }
+
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ List<Callable<T>> wrappedTasks = constructListOfWrappedTasks(tasks);
+ return executor.invokeAny(wrappedTasks);
+ }
+
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ List<Callable<T>> wrappedTasks = constructListOfWrappedTasks(tasks);
+ return executor.invokeAny(wrappedTasks, timeout, unit);
+ }
+
+ public void execute(Runnable command) {
+ executor.execute(new TaskRunnable(command, classLoaderToUse()));
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/main/java/org/apache/river/container/work/ContextualWorkManager.java
----------------------------------------------------------------------
diff --git a/river-container-core/src/main/java/org/apache/river/container/work/ContextualWorkManager.java b/river-container-core/src/main/java/org/apache/river/container/work/ContextualWorkManager.java
index 8b6e742..e23307b 100644
--- a/river-container-core/src/main/java/org/apache/river/container/work/ContextualWorkManager.java
+++ b/river-container-core/src/main/java/org/apache/river/container/work/ContextualWorkManager.java
@@ -19,6 +19,7 @@ package org.apache.river.container.work;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.river.container.Strings;
/**
@@ -29,44 +30,46 @@ public class ContextualWorkManager {
List<Context> contexts=new ArrayList<Context>();
- public WorkingContext createContext(String name) {
- Context context=new Context(name);
+ public WorkingContext createContext(String name, ClassLoader contextLoader) {
+ Context context=new Context(name, contextLoader);
contexts.add(context);
return context;
}
private class Context implements WorkingContext {
String name=Strings.UNNAMED;
-
+ ClassLoader contextLoader;
+
public String getName() {
return name;
}
- public Context(String name) {
+ public Context(String name, ClassLoader contextLoader) {
this.name=name;
- workManager=new BasicWorkManager(name);
+ this.contextLoader=contextLoader;
+ executor=new BasicExecutor(contextLoader, name);
}
- BasicWorkManager workManager=null;
+ BasicExecutor executor=null;
@Override
- public WorkManager getWorkManager() {
- return workManager;
+ public ScheduledExecutorService getScheduledExecutorService() {
+ return executor;
}
@Override
public int getActiveThreadCount() {
- return workManager.getActiveCount();
+ return executor.getActiveCount();
}
@Override
public void shutdown() {
- workManager.shutdown();
+ executor.shutdownNow();
}
@Override
public void interrupt() {
- workManager.interrupt();
+ executor.shutdownNow();
}
}
http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/main/java/org/apache/river/container/work/WorkingContext.java
----------------------------------------------------------------------
diff --git a/river-container-core/src/main/java/org/apache/river/container/work/WorkingContext.java b/river-container-core/src/main/java/org/apache/river/container/work/WorkingContext.java
index 56b2a0a..1f8c6e4 100644
--- a/river-container-core/src/main/java/org/apache/river/container/work/WorkingContext.java
+++ b/river-container-core/src/main/java/org/apache/river/container/work/WorkingContext.java
@@ -17,6 +17,8 @@
*/
package org.apache.river.container.work;
+import java.util.concurrent.ScheduledExecutorService;
+
/**
@author trasukg
@@ -27,7 +29,7 @@ public interface WorkingContext {
context.
@return The WorkManager instance.
*/
- WorkManager getWorkManager();
+ ScheduledExecutorService getScheduledExecutorService();
/**
Answer how many threads are currently active in this context.
http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/site/markdown/WorkManager.md
----------------------------------------------------------------------
diff --git a/river-container-core/src/site/markdown/WorkManager.md b/river-container-core/src/site/markdown/WorkManager.md
new file mode 100644
index 0000000..5ea2bf5
--- /dev/null
+++ b/river-container-core/src/site/markdown/WorkManager.md
@@ -0,0 +1,60 @@
+Work Manager and Work Management in the Container
+=================================================
+
+- In general, containers should be able to control the thread usage and scheduling of
+work inside the container. Otherwise it's possible for an application to hijack
+execution or prevent a different application, or possibly the container itself,
+from executing properly.
+- So, this desire means that applications should be discouraged or disallowed from
+creating their own threads. In turn, that restriction means that the container must
+offer some way to schedule work that should happen on another thread, and possibly at
+some time in the future, or even repeatedly.
+
+How it is on 20140125
+---------------------
+
+- The container includes an attempt at this, embodied in `org.apache.river.container.work`
+ - There is an interface 'WorkManager' that contains a `queue(...)` method that
+ drops the task into a task queue. The queue method also allows the user to
+ specify a classloader for the task to run in. The thread pool is expected to set
+ this classloader as the context classloader before executing the task.
+ - There is a 'BasicWorkManager' implementation that uses a ThreadPoolExecutor to
+ implement 'WorkManager'
+ - There is a 'ContextualWorkManager' implementation that allows jobs to be grouped
+ together and cancelled en-masse, for instance when an application needs to be
+ shut down.
+
+Problems
+--------
+
+- We need to provide a way for well-written applications to schedule multi-threaded
+work. We probably shouldn't introduce container-specific API, especially since there
+is a perfectly good API for work management in `javax.concurrent`.
+- As written now, the API doesn't prevent a single thread pool, but that isn't implemented
+yet. The working contexts each have their own thread pool.
+- There is no facility for an application to have any internal prioritization.
+
+Design Goals
+------------
+
+- Provide an API to applications that allows them to fire off background work,
+scheduled executions, and repetitive tasks.
+ - Essentially, one or more ScheduledExecutorService objects should be provided
+ for the application.
+- Ideally, there should be one thread pool that is managed by the container
+- The executor objects provided to the applications should be isolated from each
+other, cancellable en-masse (for application shutdown) and should preserve the
+context classloader.
+- The number of threads in the thread pool should be configurable.
+- Ideally, the thread pool policy should be configurable (i.e. fixed threads,
+max threads, min threads, etc).
+- Current users of WorkManager interface should be migrated to the new API.
+- Number of threads in use, etc should be visible through a management interface.
+- Applications should be able to provide prioritization on the tasks
+ - Perhaps by also implementing Comparable in the task that implements Runnable.
+ - Runnables that come "first" are run first.
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/river-container-core/src/site/markdown/index.md b/river-container-core/src/site/markdown/index.md
index ed271e1..d86c88a 100644
--- a/river-container-core/src/site/markdown/index.md
+++ b/river-container-core/src/site/markdown/index.md
@@ -58,6 +58,9 @@ classes.
- [State Machine](StateMachine.html) Information on the annotation-based state
machine implementation used in various places in the container.
- [Surrogate Deployment](SurrogateDeployment.html) Deployer used to host
-Jini Surrogates.
+Jini Surrogates.
+- [Work Manager Considerations](WorkManager.html) Design thoughts on work/thread
+management in the
+container.
- [To Do](Todo.html) To-do list for the development.
http://git-wip-us.apache.org/repos/asf/river-container/blob/e6828db5/river-container-core/src/test/java/org/apache/river/container/work/ContextualWorkManagerTest.java
----------------------------------------------------------------------
diff --git a/river-container-core/src/test/java/org/apache/river/container/work/ContextualWorkManagerTest.java b/river-container-core/src/test/java/org/apache/river/container/work/ContextualWorkManagerTest.java
index 3c751d8..5bdab85 100644
--- a/river-container-core/src/test/java/org/apache/river/container/work/ContextualWorkManagerTest.java
+++ b/river-container-core/src/test/java/org/apache/river/container/work/ContextualWorkManagerTest.java
@@ -32,30 +32,30 @@ import org.junit.Test;
public class ContextualWorkManagerTest {
ContextualWorkManager UUT = new ContextualWorkManager();
- WorkingContext context = UUT.createContext("Test-ctx");
+ WorkingContext context = UUT.createContext("Test-ctx", Thread.currentThread().getContextClassLoader());
@Test
public void testContextCreation() {
assertNotNull("context", context);
- assertNotNull("context.workManager", context.getWorkManager());
+ assertNotNull("context.scheduledExecutorService", context.getScheduledExecutorService());
}
@Test
- public void testThreadCount() {
+ public void testRunAndExit() {
WorkerRunnable wt = new WorkerRunnable();
- context.getWorkManager().queueTask(null, wt);
+ context.getScheduledExecutorService().submit(wt);
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < 2000 & context.getActiveThreadCount() < 1) {
Thread.yield();
}
assertEquals("thread count", 1, context.getActiveThreadCount());
- wt.proceed = true;
+
}
@Test
public void testChildThreadGroup() throws Exception {
WorkerRunnable wt = new WorkerRunnable();
- context.getWorkManager().queueTask(null, wt);
+ context.getScheduledExecutorService().submit(wt);
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < 2000 & context.getActiveThreadCount() < 1) {
Thread.yield();
@@ -71,7 +71,7 @@ public class ContextualWorkManagerTest {
@Test
public void testThreadCountWithChildren() throws Exception {
WorkerRunnable wt = new WorkerRunnable(2);
- context.getWorkManager().queueTask(null, wt);
+ context.getScheduledExecutorService().submit(wt);
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < 2000 & context.getActiveThreadCount() < 1) {
Thread.yield();
@@ -91,7 +91,7 @@ public class ContextualWorkManagerTest {
String threadGroupName = Strings.UNKNOWN;
List<WorkerRunnable> children = new ArrayList<WorkerRunnable>();
String id = "--";
- boolean proceed = false;
+ volatile boolean proceed = false;
int nChildren = 0;
public WorkerRunnable() {