You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by gt...@apache.org on 2012/07/14 08:44:42 UTC
svn commit: r1361475 - in /river/jtsk/skunk/surrogate:
src/org/apache/river/container/ src/org/apache/river/container/work/
test/org/apache/river/container/work/
Author: gtrasuk
Date: Sat Jul 14 06:44:41 2012
New Revision: 1361475
URL: http://svn.apache.org/viewvc?rev=1361475&view=rev
Log:
Working on thread management. Need to enhance the thread management so that we can verify that a service is shut down when commanded.
Added:
river/jtsk/skunk/surrogate/src/org/apache/river/container/work/ContextualWorkManager.java
river/jtsk/skunk/surrogate/src/org/apache/river/container/work/WorkingContext.java
river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ContextualWorkManagerTest.java
river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ThreadPoolExecutorTest.java
Modified:
river/jtsk/skunk/surrogate/src/org/apache/river/container/MessageNames.java
river/jtsk/skunk/surrogate/src/org/apache/river/container/Messages.properties
river/jtsk/skunk/surrogate/src/org/apache/river/container/Strings.java
river/jtsk/skunk/surrogate/src/org/apache/river/container/work/BasicWorkManager.java
Modified: river/jtsk/skunk/surrogate/src/org/apache/river/container/MessageNames.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/surrogate/src/org/apache/river/container/MessageNames.java?rev=1361475&r1=1361474&r2=1361475&view=diff
==============================================================================
--- river/jtsk/skunk/surrogate/src/org/apache/river/container/MessageNames.java (original)
+++ river/jtsk/skunk/surrogate/src/org/apache/river/container/MessageNames.java Sat Jul 14 06:44:41 2012
@@ -71,6 +71,7 @@ public class MessageNames {
CONFIG_FILE="configFile",
CONFIGURED_CLASSPATH = "configuredClasspath",
CONTEXT_ITEM = "contextItem",
+ CREATED_THREAD="createdThread",
DUPLICATE_CLASSPATH="duplicateClasspath",
EXCEPTION_THROWN="exceptionThrown",
FAILED_DEPLOY_SERVICE="failedDeployService",
Modified: river/jtsk/skunk/surrogate/src/org/apache/river/container/Messages.properties
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/surrogate/src/org/apache/river/container/Messages.properties?rev=1361475&r1=1361474&r2=1361475&view=diff
==============================================================================
--- river/jtsk/skunk/surrogate/src/org/apache/river/container/Messages.properties (original)
+++ river/jtsk/skunk/surrogate/src/org/apache/river/container/Messages.properties Sat Jul 14 06:44:41 2012
@@ -50,6 +50,7 @@ completedServiceDeployment=Completed dep
configFile=Configuration file is ''{0}''.
configuredClasspath=The configured classpath with id ''{0}'' is {1}.
contextItem=Context key {0} refers to ''{1}''.
+createdThread=Created thread named ''{0}'' in thread group ''{1}''.
duplicateClasspath=Duplicate class path entry for id ''{0}''.
exceptionThrown=Exception thrown:
failedDeployService=Deployment of service archive at ''{0}'' failed.
Modified: river/jtsk/skunk/surrogate/src/org/apache/river/container/Strings.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/surrogate/src/org/apache/river/container/Strings.java?rev=1361475&r1=1361474&r2=1361475&view=diff
==============================================================================
--- river/jtsk/skunk/surrogate/src/org/apache/river/container/Strings.java (original)
+++ river/jtsk/skunk/surrogate/src/org/apache/river/container/Strings.java Sat Jul 14 06:44:41 2012
@@ -72,6 +72,7 @@ public class Strings {
SYSTEM_CLASS_LOADER="systemClassLoader",
TYPE="type",
UNKNOWN="unknown",
+ UNNAMED="unnamed",
WHITESPACE_SEPARATORS=" \t\n\r",
WORK="work";
Modified: river/jtsk/skunk/surrogate/src/org/apache/river/container/work/BasicWorkManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/surrogate/src/org/apache/river/container/work/BasicWorkManager.java?rev=1361475&r1=1361474&r2=1361475&view=diff
==============================================================================
--- river/jtsk/skunk/surrogate/src/org/apache/river/container/work/BasicWorkManager.java (original)
+++ river/jtsk/skunk/surrogate/src/org/apache/river/container/work/BasicWorkManager.java Sat Jul 14 06:44:41 2012
@@ -20,10 +20,13 @@ package org.apache.river.container.work;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.river.container.Init;
import org.apache.river.container.MessageNames;
import org.apache.river.container.Shutdown;
+import org.apache.river.container.Strings;
/**
@@ -35,8 +38,34 @@ a ThreadPoolExecutor.
public class BasicWorkManager implements WorkManager {
private static final Logger log=Logger.getLogger(BasicWorkManager.class.getName(), MessageNames.BUNDLE_NAME);
- ExecutorService executor = Executors.newCachedThreadPool();
-
+ ExecutorService executor = null;
+ private MyThreadFactory threadFactory=null;
+
+ private String name=Strings.UNNAMED;
+
+ public BasicWorkManager() {}
+
+ public BasicWorkManager(String name) {
+ this.name=name;
+ threadFactory=new MyThreadFactory();
+ executor=Executors.newCachedThreadPool(threadFactory);
+ }
+
+ int activeCount=0;
+
+ synchronized void incActiveCount() {
+ activeCount++;
+ }
+
+ synchronized void decActiveCount() {
+ activeCount--;
+ }
+
+ synchronized int getActiveCount() {
+ //return activeCount;
+ return threadFactory.threadGroup.activeCount();
+ }
+
@Override
public void queueTask(TaskClass taskClass, ClassLoader contextClassLoader, Runnable task) {
ClassLoader classLoaderToUse =
@@ -60,9 +89,11 @@ public class BasicWorkManager implements
originalClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(contextClassLoader);
try {
+ incActiveCount();
task.run();
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader);
+ decActiveCount();
}
}
}
@@ -76,4 +107,21 @@ public class BasicWorkManager implements
public void shutdown() {
executor.shutdownNow();
}
+
+ private class MyThreadFactory implements ThreadFactory {
+
+ private ThreadGroup threadGroup=new ThreadGroup(name);
+ private int index=0;
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t=new Thread(threadGroup, r);
+ t.setName(name + Strings.DASH + index++);
+ log.log(Level.FINE,MessageNames.CREATED_THREAD,
+ new Object[] { t.getName(), t.getThreadGroup().getName() });
+ return t;
+ }
+
+ }
+
}
Added: river/jtsk/skunk/surrogate/src/org/apache/river/container/work/ContextualWorkManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/surrogate/src/org/apache/river/container/work/ContextualWorkManager.java?rev=1361475&view=auto
==============================================================================
--- river/jtsk/skunk/surrogate/src/org/apache/river/container/work/ContextualWorkManager.java (added)
+++ river/jtsk/skunk/surrogate/src/org/apache/river/container/work/ContextualWorkManager.java Sat Jul 14 06:44:41 2012
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.container.work;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.river.container.Strings;
+
+/**
+
+ @author trasukg
+ */
+public class ContextualWorkManager {
+
+ List<Context> contexts=new ArrayList<Context>();
+
+ WorkingContext createContext(String name) {
+ Context context=new Context(name);
+ contexts.add(context);
+ return context;
+ }
+
+ private class Context implements WorkingContext {
+ String name=Strings.UNNAMED;
+
+ public String getName() {
+ return name;
+ }
+
+ public Context(String name) {
+ this.name=name;
+ workManager=new BasicWorkManager(name);
+ }
+
+ BasicWorkManager workManager=null;
+
+ @Override
+ public WorkManager getWorkManager() {
+ return workManager;
+ }
+
+ @Override
+ public int getActiveThreadCount() {
+ return workManager.getActiveCount();
+ }
+
+ @Override
+ public void interrupt() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ }
+}
Added: river/jtsk/skunk/surrogate/src/org/apache/river/container/work/WorkingContext.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/surrogate/src/org/apache/river/container/work/WorkingContext.java?rev=1361475&view=auto
==============================================================================
--- river/jtsk/skunk/surrogate/src/org/apache/river/container/work/WorkingContext.java (added)
+++ river/jtsk/skunk/surrogate/src/org/apache/river/container/work/WorkingContext.java Sat Jul 14 06:44:41 2012
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.container.work;
+
+/**
+
+ @author trasukg
+ */
+public interface WorkingContext {
+ /**
+ Retrieve the instance of the WorkManager interface that goes with this
+ context.
+ @return The WorkManager instance.
+ */
+ WorkManager getWorkManager();
+
+ /**
+ Answer how many threads are currently active in this context.
+ @return number of active threads.
+ */
+ int getActiveThreadCount();
+
+ /**
+ Attempt to stop all threads in the context by interrupting them.
+ */
+ void interrupt();
+}
Added: river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ContextualWorkManagerTest.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ContextualWorkManagerTest.java?rev=1361475&view=auto
==============================================================================
--- river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ContextualWorkManagerTest.java (added)
+++ river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ContextualWorkManagerTest.java Sat Jul 14 06:44:41 2012
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.container.work;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.river.container.Strings;
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+
+ @author trasukg
+ */
+public class ContextualWorkManagerTest {
+
+ ContextualWorkManager UUT = new ContextualWorkManager();
+ WorkingContext context = UUT.createContext("Test-ctx");
+
+ @Test
+ public void testContextCreation() {
+ assertNotNull("context", context);
+ assertNotNull("context.workManager", context.getWorkManager());
+ }
+
+ @Test
+ public void testThreadCount() {
+ WorkerRunnable wt = new WorkerRunnable();
+ context.getWorkManager().queueTask(TaskClass.SYSTEM_TASK, null, wt);
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < 2000 & context.getActiveThreadCount() < 1) {
+ Thread.yield();
+ }
+ assertEquals("thread count", 1, context.getActiveThreadCount());
+ wt.proceed = true;
+ }
+
+ @Test
+ public void testChildThreadGroup() throws Exception {
+ WorkerRunnable wt=new WorkerRunnable();
+ context.getWorkManager().queueTask(TaskClass.SYSTEM_TASK, null, wt);
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < 2000 & context.getActiveThreadCount() < 1) {
+ Thread.yield();
+ }
+ Thread.sleep(1000); // Ugly wait for thread to start.
+ assertTrue("Thread group name '" + wt.getThreadGroupName() + "' doesn't start with ctx name",
+ wt.getThreadGroupName().startsWith("Test-ctx"));
+ }
+
+ /** Hold off on this -- not needed yet.
+
+ */
+ @Test
+ public void testThreadCountWithChildren() throws Exception {
+ WorkerRunnable wt = new WorkerRunnable(2);
+ context.getWorkManager().queueTask(TaskClass.SYSTEM_TASK, null, wt);
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < 2000 & context.getActiveThreadCount() < 1) {
+ Thread.yield();
+ }
+ Thread.sleep(1000);
+ try {
+ assertEquals("thread count", 3, context.getActiveThreadCount());
+ } finally {
+ wt.proceed = true;
+ Thread.sleep(1000);
+ }
+ }
+
+ private class WorkerRunnable extends Thread {
+ String threadGroupName=Strings.UNKNOWN;
+ List<WorkerRunnable> children = new ArrayList<WorkerRunnable>();
+ String id = "--";
+ boolean proceed = false;
+
+ public WorkerRunnable() {
+ }
+
+ public String getThreadGroupName() {
+ return threadGroupName;
+ }
+
+ public WorkerRunnable(int nChildren) {
+ for (int x = 0; x < nChildren; x++) {
+ WorkerRunnable newWorker = new WorkerRunnable();
+ newWorker.id = "WorkerRunnable-" + x;
+ newWorker.start();
+ children.add(newWorker);
+ }
+ }
+
+ public void run() {
+ threadGroupName=Thread.currentThread().getThreadGroup().getName();
+
+ System.out.println("Worker " + id + " beginning.");
+ try {
+ while (!proceed) {
+ Thread.sleep(500);
+ }
+ } catch (InterruptedException ex) {
+ Logger.getLogger(ContextualWorkManagerTest.class.getName()).log(Level.SEVERE, null, ex);
+ } finally {
+ for (WorkerRunnable worker : children) {
+ worker.proceed = true;
+ }
+ System.out.println("Worker " + id + " ended.");
+
+ }
+ }
+ }
+}
Added: river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ThreadPoolExecutorTest.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ThreadPoolExecutorTest.java?rev=1361475&view=auto
==============================================================================
--- river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ThreadPoolExecutorTest.java (added)
+++ river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ThreadPoolExecutorTest.java Sat Jul 14 06:44:41 2012
@@ -0,0 +1,39 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.river.container.work;
+
+import org.junit.*;
+import static org.junit.Assert.*;
+
+/**
+
+ @author trasukg
+ */
+public class ThreadPoolExecutorTest {
+
+ public ThreadPoolExecutorTest() {
+ }
+
+ @BeforeClass
+ public static void setUpClass() throws Exception {
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ }
+
+ @Before
+ public void setUp() {
+ }
+
+ @After
+ public void tearDown() {
+ }
+ // TODO add test methods here.
+ // The methods must be annotated with annotation @Test. For example:
+ //
+ // @Test
+ // public void hello() {}
+}