You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by gt...@apache.org on 2012/07/14 08:44:42 UTC

svn commit: r1361475 - in /river/jtsk/skunk/surrogate: src/org/apache/river/container/ src/org/apache/river/container/work/ test/org/apache/river/container/work/

Author: gtrasuk
Date: Sat Jul 14 06:44:41 2012
New Revision: 1361475

URL: http://svn.apache.org/viewvc?rev=1361475&view=rev
Log:
Working on thread management.  Need to enhance the thread management so that we can verify that a service is shut down when commanded.

Added:
    river/jtsk/skunk/surrogate/src/org/apache/river/container/work/ContextualWorkManager.java
    river/jtsk/skunk/surrogate/src/org/apache/river/container/work/WorkingContext.java
    river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ContextualWorkManagerTest.java
    river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ThreadPoolExecutorTest.java
Modified:
    river/jtsk/skunk/surrogate/src/org/apache/river/container/MessageNames.java
    river/jtsk/skunk/surrogate/src/org/apache/river/container/Messages.properties
    river/jtsk/skunk/surrogate/src/org/apache/river/container/Strings.java
    river/jtsk/skunk/surrogate/src/org/apache/river/container/work/BasicWorkManager.java

Modified: river/jtsk/skunk/surrogate/src/org/apache/river/container/MessageNames.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/surrogate/src/org/apache/river/container/MessageNames.java?rev=1361475&r1=1361474&r2=1361475&view=diff
==============================================================================
--- river/jtsk/skunk/surrogate/src/org/apache/river/container/MessageNames.java (original)
+++ river/jtsk/skunk/surrogate/src/org/apache/river/container/MessageNames.java Sat Jul 14 06:44:41 2012
@@ -71,6 +71,7 @@ public class MessageNames {
             CONFIG_FILE="configFile",
             CONFIGURED_CLASSPATH = "configuredClasspath",
             CONTEXT_ITEM = "contextItem",
+            CREATED_THREAD="createdThread",
             DUPLICATE_CLASSPATH="duplicateClasspath",
             EXCEPTION_THROWN="exceptionThrown",
             FAILED_DEPLOY_SERVICE="failedDeployService",

Modified: river/jtsk/skunk/surrogate/src/org/apache/river/container/Messages.properties
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/surrogate/src/org/apache/river/container/Messages.properties?rev=1361475&r1=1361474&r2=1361475&view=diff
==============================================================================
--- river/jtsk/skunk/surrogate/src/org/apache/river/container/Messages.properties (original)
+++ river/jtsk/skunk/surrogate/src/org/apache/river/container/Messages.properties Sat Jul 14 06:44:41 2012
@@ -50,6 +50,7 @@ completedServiceDeployment=Completed dep
 configFile=Configuration file is ''{0}''.
 configuredClasspath=The configured classpath with id ''{0}'' is {1}.
 contextItem=Context key {0} refers to ''{1}''.
+createdThread=Created thread named ''{0}'' in thread group ''{1}''.
 duplicateClasspath=Duplicate class path entry for id ''{0}''.
 exceptionThrown=Exception thrown:
 failedDeployService=Deployment of service archive at ''{0}'' failed.

Modified: river/jtsk/skunk/surrogate/src/org/apache/river/container/Strings.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/surrogate/src/org/apache/river/container/Strings.java?rev=1361475&r1=1361474&r2=1361475&view=diff
==============================================================================
--- river/jtsk/skunk/surrogate/src/org/apache/river/container/Strings.java (original)
+++ river/jtsk/skunk/surrogate/src/org/apache/river/container/Strings.java Sat Jul 14 06:44:41 2012
@@ -72,6 +72,7 @@ public class Strings {
             SYSTEM_CLASS_LOADER="systemClassLoader",
             TYPE="type",
             UNKNOWN="unknown",
+            UNNAMED="unnamed",
             WHITESPACE_SEPARATORS=" \t\n\r",
             WORK="work";
 

Modified: river/jtsk/skunk/surrogate/src/org/apache/river/container/work/BasicWorkManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/surrogate/src/org/apache/river/container/work/BasicWorkManager.java?rev=1361475&r1=1361474&r2=1361475&view=diff
==============================================================================
--- river/jtsk/skunk/surrogate/src/org/apache/river/container/work/BasicWorkManager.java (original)
+++ river/jtsk/skunk/surrogate/src/org/apache/river/container/work/BasicWorkManager.java Sat Jul 14 06:44:41 2012
@@ -20,10 +20,13 @@ package org.apache.river.container.work;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 import org.apache.river.container.Init;
 import org.apache.river.container.MessageNames;
 import org.apache.river.container.Shutdown;
+import org.apache.river.container.Strings;
 
 /**
 
@@ -35,8 +38,34 @@ a ThreadPoolExecutor.
 public class BasicWorkManager implements WorkManager {
     private static final Logger log=Logger.getLogger(BasicWorkManager.class.getName(), MessageNames.BUNDLE_NAME);
     
-    ExecutorService executor = Executors.newCachedThreadPool();
-
+    ExecutorService executor = null;
+    private MyThreadFactory threadFactory=null;
+    
+    private String name=Strings.UNNAMED;
+    
+    public BasicWorkManager() {}
+    
+    public BasicWorkManager(String name) {
+        this.name=name;
+        threadFactory=new MyThreadFactory();
+        executor=Executors.newCachedThreadPool(threadFactory);
+    }
+    
+    int activeCount=0;
+    
+    synchronized void incActiveCount() {
+        activeCount++;
+    }
+    
+    synchronized void decActiveCount() {
+        activeCount--;
+    }
+    
+    synchronized int getActiveCount() {
+        //return activeCount;
+        return threadFactory.threadGroup.activeCount();
+    }
+    
     @Override
     public void queueTask(TaskClass taskClass, ClassLoader contextClassLoader, Runnable task) {
         ClassLoader classLoaderToUse = 
@@ -60,9 +89,11 @@ public class BasicWorkManager implements
             originalClassLoader = Thread.currentThread().getContextClassLoader();
             Thread.currentThread().setContextClassLoader(contextClassLoader);
             try {
+                incActiveCount();
                 task.run();
             } finally {
                 Thread.currentThread().setContextClassLoader(originalClassLoader);
+                decActiveCount();
             }
         }
     }
@@ -76,4 +107,21 @@ public class BasicWorkManager implements
     public void shutdown() {
         executor.shutdownNow();
     }
+    
+    private class MyThreadFactory implements ThreadFactory {
+
+        private ThreadGroup threadGroup=new ThreadGroup(name);
+        private int index=0;
+        
+        @Override
+        public Thread newThread(Runnable r) {
+            Thread t=new Thread(threadGroup, r);
+            t.setName(name + Strings.DASH + index++);
+            log.log(Level.FINE,MessageNames.CREATED_THREAD,
+                    new Object[] { t.getName(), t.getThreadGroup().getName() });
+            return t;
+        }
+        
+    }
+    
 }

Added: river/jtsk/skunk/surrogate/src/org/apache/river/container/work/ContextualWorkManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/surrogate/src/org/apache/river/container/work/ContextualWorkManager.java?rev=1361475&view=auto
==============================================================================
--- river/jtsk/skunk/surrogate/src/org/apache/river/container/work/ContextualWorkManager.java (added)
+++ river/jtsk/skunk/surrogate/src/org/apache/river/container/work/ContextualWorkManager.java Sat Jul 14 06:44:41 2012
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.container.work;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.river.container.Strings;
+
+/**
+
+ @author trasukg
+ */
+public class ContextualWorkManager {
+
+    List<Context> contexts=new ArrayList<Context>();
+    
+    WorkingContext createContext(String name) {
+        Context context=new Context(name);
+        contexts.add(context);
+        return context;
+    }
+    
+    private class Context implements WorkingContext {
+        String name=Strings.UNNAMED;
+
+        public String getName() {
+            return name;
+        }
+
+        public Context(String name) {
+            this.name=name;
+            workManager=new BasicWorkManager(name);
+        }
+        
+        BasicWorkManager workManager=null;
+        
+        @Override
+        public WorkManager getWorkManager() {
+            return workManager;
+        }
+
+        @Override
+        public int getActiveThreadCount() {
+            return workManager.getActiveCount();
+        }
+
+        @Override
+        public void interrupt() {
+            throw new UnsupportedOperationException("Not supported yet.");
+        }
+        
+    }
+}

Added: river/jtsk/skunk/surrogate/src/org/apache/river/container/work/WorkingContext.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/surrogate/src/org/apache/river/container/work/WorkingContext.java?rev=1361475&view=auto
==============================================================================
--- river/jtsk/skunk/surrogate/src/org/apache/river/container/work/WorkingContext.java (added)
+++ river/jtsk/skunk/surrogate/src/org/apache/river/container/work/WorkingContext.java Sat Jul 14 06:44:41 2012
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.container.work;
+
+/**
+
+ @author trasukg
+ */
+public interface WorkingContext {
+    /**
+    Retrieve the instance of the WorkManager interface that goes with this
+    context.
+    @return The WorkManager instance.
+    */
+    WorkManager getWorkManager();
+    
+    /**
+    Answer how many threads are currently active in this context.
+    @return number of active threads.
+    */
+    int getActiveThreadCount();
+    
+    /**
+    Attempt to stop all threads in the context by interrupting them.
+    */
+    void interrupt();
+}

Added: river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ContextualWorkManagerTest.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ContextualWorkManagerTest.java?rev=1361475&view=auto
==============================================================================
--- river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ContextualWorkManagerTest.java (added)
+++ river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ContextualWorkManagerTest.java Sat Jul 14 06:44:41 2012
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.container.work;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.river.container.Strings;
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+
+ @author trasukg
+ */
+public class ContextualWorkManagerTest {
+
+    ContextualWorkManager UUT = new ContextualWorkManager();
+    WorkingContext context = UUT.createContext("Test-ctx");
+
+    @Test
+    public void testContextCreation() {
+        assertNotNull("context", context);
+        assertNotNull("context.workManager", context.getWorkManager());
+    }
+
+    @Test
+    public void testThreadCount() {
+        WorkerRunnable wt = new WorkerRunnable();
+        context.getWorkManager().queueTask(TaskClass.SYSTEM_TASK, null, wt);
+        long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < 2000 & context.getActiveThreadCount() < 1) {
+            Thread.yield();
+        }
+        assertEquals("thread count", 1, context.getActiveThreadCount());
+        wt.proceed = true;
+    }
+
+    @Test 
+    public void testChildThreadGroup() throws Exception {
+        WorkerRunnable wt=new WorkerRunnable();
+        context.getWorkManager().queueTask(TaskClass.SYSTEM_TASK, null, wt);
+        long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < 2000 & context.getActiveThreadCount() < 1) {
+            Thread.yield();
+        }
+        Thread.sleep(1000); // Ugly wait for thread to start.
+        assertTrue("Thread group name '" + wt.getThreadGroupName() + "' doesn't start with ctx name",
+                wt.getThreadGroupName().startsWith("Test-ctx"));
+    }
+    
+    /** Hold off on this -- not needed yet. 
+    
+    */
+    @Test
+    public void testThreadCountWithChildren() throws Exception {
+        WorkerRunnable wt = new WorkerRunnable(2);
+        context.getWorkManager().queueTask(TaskClass.SYSTEM_TASK, null, wt);
+        long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < 2000 & context.getActiveThreadCount() < 1) {
+            Thread.yield();
+        }
+        Thread.sleep(1000);
+        try {
+            assertEquals("thread count", 3, context.getActiveThreadCount());
+        } finally {
+            wt.proceed = true;
+            Thread.sleep(1000);
+        }
+    }
+
+    private class WorkerRunnable extends Thread {
+        String threadGroupName=Strings.UNKNOWN;
+        List<WorkerRunnable> children = new ArrayList<WorkerRunnable>();
+        String id = "--";
+        boolean proceed = false;
+
+        public WorkerRunnable() {
+        }
+
+        public String getThreadGroupName() {
+            return threadGroupName;
+        }
+        
+        public WorkerRunnable(int nChildren) {
+            for (int x = 0; x < nChildren; x++) {
+                WorkerRunnable newWorker = new WorkerRunnable();
+                newWorker.id = "WorkerRunnable-" + x;
+                newWorker.start();
+                children.add(newWorker);
+            }
+        }
+
+        public void run() {
+            threadGroupName=Thread.currentThread().getThreadGroup().getName();
+            
+            System.out.println("Worker " + id + " beginning.");
+            try {
+                while (!proceed) {
+                    Thread.sleep(500);
+                }
+            } catch (InterruptedException ex) {
+                Logger.getLogger(ContextualWorkManagerTest.class.getName()).log(Level.SEVERE, null, ex);
+            } finally {
+                for (WorkerRunnable worker : children) {
+                    worker.proceed = true;
+                }
+                System.out.println("Worker " + id + " ended.");
+
+            }
+        }
+    }
+}

Added: river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ThreadPoolExecutorTest.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ThreadPoolExecutorTest.java?rev=1361475&view=auto
==============================================================================
--- river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ThreadPoolExecutorTest.java (added)
+++ river/jtsk/skunk/surrogate/test/org/apache/river/container/work/ThreadPoolExecutorTest.java Sat Jul 14 06:44:41 2012
@@ -0,0 +1,39 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.river.container.work;
+
+import org.junit.*;
+import static org.junit.Assert.*;
+
+/**
+
+ @author trasukg
+ */
+public class ThreadPoolExecutorTest {
+    
+    public ThreadPoolExecutorTest() {
+    }
+
+    @BeforeClass
+    public static void setUpClass() throws Exception {
+    }
+
+    @AfterClass
+    public static void tearDownClass() throws Exception {
+    }
+    
+    @Before
+    public void setUp() {
+    }
+    
+    @After
+    public void tearDown() {
+    }
+    // TODO add test methods here.
+    // The methods must be annotated with annotation @Test. For example:
+    //
+    // @Test
+    // public void hello() {}
+}