You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2009/01/13 19:19:17 UTC

svn commit: r734205 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ common/src/main/java/org/apache/qpid/thread/ common/src/main/java/org/apache/qpid/transport/network/io/ common/src/test/java/org/apache/qpid/thread/

Author: rajith
Date: Tue Jan 13 10:19:00 2009
New Revision: 734205

URL: http://svn.apache.org/viewvc?rev=734205&view=rev
Log:
This is related to QPID-1479
This commit contains the core classes for adding the thread abstraction patch

Added:
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/thread/
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java
Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=734205&r1=734204&r2=734205&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Jan 13 10:19:00 2009
@@ -67,16 +67,26 @@
 import org.apache.qpid.client.failover.FailoverNoopSupport;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
 import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.message.*;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.CloseConsumerMessage;
+import org.apache.qpid.client.message.JMSBytesMessage;
+import org.apache.qpid.client.message.JMSMapMessage;
+import org.apache.qpid.client.message.JMSObjectMessage;
+import org.apache.qpid.client.message.JMSStreamMessage;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.util.FlowControllingBlockingQueue;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.jms.Session;
+import org.apache.qpid.thread.Threading;
 import org.apache.qpid.url.AMQBindingURL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -271,6 +281,8 @@
 
     /** Holds the dispatcher thread for this session. */
     protected Dispatcher _dispatcher;
+    
+    protected Thread _dispatcherThread;
 
     /** Holds the message factory factory for this session. */
     protected MessageFactoryRegistry _messageFactoryRegistry;
@@ -668,7 +680,7 @@
             if (_dispatcher != null)
             {
                 // Failover failed and ain't coming back. Knife the dispatcher.
-                _dispatcher.interrupt();
+                _dispatcherThread.interrupt();
             }
         }
 
@@ -1852,7 +1864,7 @@
     void startDispatcherIfNecessary()
     {
         //If we are the dispatcher then we don't need to check we are started
-        if (Thread.currentThread() == _dispatcher)
+        if (Thread.currentThread() == _dispatcherThread)
         {
             return;
         }
@@ -1883,9 +1895,23 @@
         if (_dispatcher == null)
         {
             _dispatcher = new Dispatcher();
-            _dispatcher.setDaemon(true);
+            try
+            {
+                _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);       
+                
+            }
+            catch(Exception e)
+            {
+                throw new Error("Error creating Dispatcher thread",e);
+            }            
+            _dispatcherThread.setName("Dispatcher-Channel-" + _channelId);
+            _dispatcherThread.setDaemon(true);
             _dispatcher.setConnectionStopped(initiallyStopped);
-            _dispatcher.start();
+            _dispatcherThread.start();
+            if (_dispatcherLogger.isInfoEnabled())
+            {
+                _dispatcherLogger.info(_dispatcherThread.getName() + " created");
+            }
         }
         else
         {
@@ -2606,7 +2632,7 @@
     private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher");
 
     /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
-    class Dispatcher extends Thread
+    class Dispatcher implements Runnable
     {
 
         /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
@@ -2615,21 +2641,14 @@
         private final Object _lock = new Object();
         private String dispatcherID = "" + System.identityHashCode(this);
 
-
-
         public Dispatcher()
         {
-            super("Dispatcher-Channel-" + _channelId);
-            if (_dispatcherLogger.isInfoEnabled())
-            {
-                _dispatcherLogger.info(getName() + " created");
-            }
         }
 
         public void close()
         {
             _closed.set(true);
-            interrupt();
+            _dispatcherThread.interrupt();
 
             // fixme awaitTermination
 
@@ -2708,7 +2727,7 @@
         {
             if (_dispatcherLogger.isInfoEnabled())
             {
-                _dispatcherLogger.info(getName() + " started");
+                _dispatcherLogger.info(_dispatcherThread.getName() + " started");
             }
 
             UnprocessedMessage message;
@@ -2771,7 +2790,7 @@
 
             if (_dispatcherLogger.isInfoEnabled())
             {
-                _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId);
+                _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId);
             }
         }
 

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java?rev=734205&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java Tue Jan 13 10:19:00 2009
@@ -0,0 +1,18 @@
+package org.apache.qpid.thread;
+
+public class DefaultThreadFactory implements ThreadFactory
+{
+
+    public Thread createThread(Runnable r)
+    {
+        return new Thread(r);
+    }
+
+    public Thread createThread(Runnable r, int priority)
+    {
+        Thread t = new Thread(r);
+        t.setPriority(priority);
+        return t;
+    }
+
+}

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java?rev=734205&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java Tue Jan 13 10:19:00 2009
@@ -0,0 +1,47 @@
+package org.apache.qpid.thread;
+
+import java.lang.reflect.Constructor;
+
+public class RealtimeThreadFactory implements ThreadFactory
+{
+    private Class threadClass;
+    private Constructor threadConstructor;
+    private Constructor priorityParameterConstructor;
+    private int defaultRTThreadPriority = 20;
+    
+    public RealtimeThreadFactory() throws Exception
+    {
+        defaultRTThreadPriority = Integer.getInteger("qpid.rt_thread_priority",20);
+        threadClass = Class.forName("javax.realtime.RealtimeThread");
+    
+        Class schedulingParametersClass = Class.forName("javax.realtime.SchedulingParameters");
+        Class releaseParametersClass = Class.forName("javax.realtime.ReleaseParameters");
+        Class memoryParametersClass = Class.forName("javax.realtime.MemoryParameters");
+        Class memoryAreaClass = Class.forName("javax.realtime.MemoryArea");
+        Class processingGroupParametersClass = Class.forName("javax.realtime.ProcessingGroupParameters");
+     
+        Class[] paramTypes = new Class[]{schedulingParametersClass,
+                                         releaseParametersClass, 
+                                         memoryParametersClass,
+                                         memoryAreaClass,
+                                         processingGroupParametersClass,
+                                         java.lang.Runnable.class};
+        
+        threadConstructor = threadClass.getConstructor(paramTypes);
+        
+        Class priorityParameterClass = Class.forName("javax.realtime.PriorityParameters");
+        priorityParameterConstructor = priorityParameterClass.getConstructor(new Class[]{int.class});        
+    }
+
+    public Thread createThread(Runnable r) throws Exception
+    {
+        return createThread(r,defaultRTThreadPriority);
+    }
+
+    public Thread createThread(Runnable r, int priority) throws Exception
+    {
+        Object priorityParams = priorityParameterConstructor.newInstance(priority);
+        return (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r);
+    }
+
+}

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java?rev=734205&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java Tue Jan 13 10:19:00 2009
@@ -0,0 +1,7 @@
+package org.apache.qpid.thread;
+
+public interface ThreadFactory
+{
+    public Thread createThread(Runnable r) throws Exception;
+    public Thread createThread(Runnable r, int priority) throws Exception;
+}

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java?rev=734205&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java Tue Jan 13 10:19:00 2009
@@ -0,0 +1,26 @@
+package org.apache.qpid.thread;
+
+public final class Threading
+{
+    private static ThreadFactory threadFactory;
+    
+    static {
+        try
+        {
+            Class threadFactoryClass = 
+                Class.forName(System.getProperty("qpid.thread_factory", 
+                                                 "org.apache.qpid.thread.DefaultThreadFactory"));
+            
+            threadFactory = (ThreadFactory)threadFactoryClass.newInstance();
+        }
+        catch(Exception e)
+        {
+            throw new Error("Error occured while loading thread factory",e);
+        }
+    }
+    
+    public static ThreadFactory getThreadFactory()
+    {
+        return threadFactory;
+    }
+}

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=734205&r1=734204&r2=734205&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Tue Jan 13 10:19:00 2009
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.transport.network.io;
 
+import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.util.Logger;
@@ -36,7 +37,7 @@
  *
  */
 
-final class IoReceiver extends Thread
+final class IoReceiver implements Runnable
 {
 
     private static final Logger log = Logger.get(IoReceiver.class);
@@ -47,6 +48,7 @@
     private final Socket socket;
     private final long timeout;
     private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final Thread receiverThread;
     private final boolean shutdownBroken =
         ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*");
 
@@ -58,10 +60,18 @@
         this.bufferSize = bufferSize;
         this.socket = transport.getSocket();
         this.timeout = timeout;
-
-        setDaemon(true);
-        setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
-        start();
+        
+        try
+        {
+            receiverThread = Threading.getThreadFactory().createThread(this);                      
+        }
+        catch(Exception e)
+        {
+            throw new Error("Error creating IOReceiver thread",e);
+        }
+        receiverThread.setDaemon(true);
+        receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
+        receiverThread.start();
     }
 
     void close(boolean block)
@@ -78,10 +88,10 @@
                 {
                     socket.shutdownInput();
                 }
-                if (block && Thread.currentThread() != this)
+                if (block && Thread.currentThread() != receiverThread)
                 {
-                    join(timeout);
-                    if (isAlive())
+                    receiverThread.join(timeout);
+                    if (receiverThread.isAlive())
                     {
                         throw new TransportException("join timed out");
                     }
@@ -133,6 +143,9 @@
                   t.getMessage().equalsIgnoreCase("socket closed") &&
                   closed.get()))
             {
+                log.error(t, "===========================================================");
+                log.error(t, "Exception");
+                log.error(t, "===========================================================");
                 receiver.exception(t);
             }
         }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=734205&r1=734204&r2=734205&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Tue Jan 13 10:19:00 2009
@@ -24,6 +24,7 @@
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.SenderException;
 import org.apache.qpid.transport.TransportException;
@@ -32,7 +33,7 @@
 import static org.apache.qpid.transport.util.Functions.*;
 
 
-public final class IoSender extends Thread implements Sender<ByteBuffer>
+public final class IoSender implements Runnable, Sender<ByteBuffer>
 {
 
     private static final Logger log = Logger.get(IoSender.class);
@@ -54,7 +55,8 @@
     private final Object notFull = new Object();
     private final Object notEmpty = new Object();
     private final AtomicBoolean closed = new AtomicBoolean(false);
-
+    private final Thread senderThread;
+    
     private volatile Throwable exception = null;
 
 
@@ -74,9 +76,18 @@
             throw new TransportException("Error getting output stream for socket", e);
         }
 
-        setDaemon(true);
-        setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
-        start();
+        try
+        {
+            senderThread = Threading.getThreadFactory().createThread(this);                      
+        }
+        catch(Exception e)
+        {
+            throw new Error("Error creating IOSender thread",e);
+        }
+        
+        senderThread.setDaemon(true);
+        senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
+        senderThread.start();
     }
 
     private static final int pof2(int n)
@@ -188,10 +199,10 @@
 
             try
             {
-                if (Thread.currentThread() != this)
+                if (Thread.currentThread() != senderThread)
                 {
-                    join(timeout);
-                    if (isAlive())
+                    senderThread.join(timeout);
+                    if (senderThread.isAlive())
                     {
                         throw new SenderException("join timed out");
                     }

Added: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java?rev=734205&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java (added)
+++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/thread/ThreadFactoryTest.java Tue Jan 13 10:19:00 2009
@@ -0,0 +1,46 @@
+package org.apache.qpid.thread;
+
+import junit.framework.TestCase;
+
+public class ThreadFactoryTest extends TestCase
+{
+    public void testThreadFactory()
+    {
+        Class threadFactoryClass = null;
+        try
+        {
+            threadFactoryClass = Class.forName(System.getProperty("qpid.thread_factory",
+                    "org.apache.qpid.thread.DefaultThreadFactory"));            
+        }
+        // If the thread factory class was wrong it will flagged way before it gets here.
+        catch(Exception e)
+        {            
+            fail("Invalid thread factory class");
+        }
+        
+        assertEquals(threadFactoryClass, Threading.getThreadFactory().getClass());
+    }
+    
+    public void testThreadCreate()
+    {
+        Runnable r = new Runnable(){
+          
+            public void run(){
+                
+            }            
+        };
+        
+        Thread t = null;
+        try
+        {
+            t = Threading.getThreadFactory().createThread(r,5);
+        }
+        catch(Exception e)
+        {
+            fail("Error creating thread using Qpid thread factory");
+        }
+        
+        assertNotNull(t);
+        assertEquals(5,t.getPriority());
+    }
+}