You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/03/17 22:19:05 UTC

svn commit: r1667409 - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/store/ broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/ broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/j...

Author: rgodfrey
Date: Tue Mar 17 21:19:05 2015
New Revision: 1667409

URL: http://svn.apache.org/r1667409
Log:
QPID-6457 : [Java Broker] Make asynchronous commits occur on executor threads

Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
    qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1667409&r1=1667408&r2=1667409&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Tue Mar 17 21:19:05 2015
@@ -35,6 +35,17 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
@@ -109,11 +120,16 @@ public abstract class AbstractJDBCMessag
             " WHERE format = ? and global_id = ? and branch_id = ?";
 
     protected final EventManager _eventManager = new EventManager();
+    private ConfiguredObject<?> _parent;
 
     protected abstract boolean isMessageStoreOpen();
 
     protected abstract void checkMessageStoreOpen();
+    private ScheduledThreadPoolExecutor _executor;
 
+    public AbstractJDBCMessageStore()
+    {
+    }
 
     protected void setMaximumMessageId()
     {
@@ -269,6 +285,34 @@ public abstract class AbstractJDBCMessag
         }
     }
 
+    protected void initMessageStore(final ConfiguredObject<?> parent)
+    {
+        _parent = parent;
+        _executor = new ScheduledThreadPoolExecutor(4, new ThreadFactory()
+        {
+            private final AtomicInteger _count = new AtomicInteger();
+            @Override
+            public Thread newThread(final Runnable r)
+            {
+                final Thread thread = Executors.defaultThreadFactory().newThread(r);
+                thread.setName(parent.getName() + "-store-"+_count.incrementAndGet());
+                return thread;
+            }
+        });
+        _executor.prestartAllCoreThreads();
+
+    }
+
+    @Override
+    public void closeMessageStore()
+    {
+        if(_executor != null)
+        {
+            _executor.shutdown();
+        }
+
+    }
+
     protected abstract Logger getLogger();
 
     protected abstract String getSqlBlobType();
@@ -835,10 +879,105 @@ public abstract class AbstractJDBCMessag
         }
     }
 
-    private FutureResult commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
+    private FutureResult commitTranAsync(final ConnectionWrapper connWrapper) throws StoreException
     {
-        commitTran(connWrapper);
-        return FutureResult.IMMEDIATE_FUTURE;
+        final Future<?> result = _executor.submit(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                commitTran(connWrapper);
+            }
+        });
+        return new FutureResult()
+        {
+            @Override
+            public boolean isComplete()
+            {
+                boolean done = result.isDone();
+                try
+                {
+                    result.get();
+                }
+                catch (InterruptedException e)
+                {
+                    // this won't happen as we're actually already done;
+                }
+                catch (ExecutionException e)
+                {
+                    if(e.getCause() instanceof RuntimeException)
+                    {
+                        throw (RuntimeException)e.getCause();
+                    }
+                    else if(e.getCause() instanceof Error)
+                    {
+                        throw (Error)e.getCause();
+                    }
+                    else
+                    {
+                        throw new StoreException(e);
+                    }
+                }
+                return done;
+            }
+
+            @Override
+            public void waitForCompletion()
+            {
+                try
+                {
+                    result.get();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new StoreException(e);
+                }
+                catch (ExecutionException e)
+                {
+                    if(e.getCause() instanceof RuntimeException)
+                    {
+                        throw (RuntimeException)e.getCause();
+                    }
+                    else if(e.getCause() instanceof Error)
+                    {
+                        throw (Error)e.getCause();
+                    }
+                    else
+                    {
+                        throw new StoreException(e);
+                    }
+                }
+            }
+
+            @Override
+            public void waitForCompletion(final long timeout) throws TimeoutException
+            {
+
+                try
+                {
+                    result.get(timeout, TimeUnit.MILLISECONDS);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new StoreException(e);
+                }
+                catch (ExecutionException e)
+                {
+                    if(e.getCause() instanceof RuntimeException)
+                    {
+                        throw (RuntimeException)e.getCause();
+                    }
+                    else if(e.getCause() instanceof Error)
+                    {
+                        throw (Error)e.getCause();
+                    }
+                    else
+                    {
+                        throw new StoreException(e);
+                    }
+                }
+            }
+        };
     }
 
     private void abortTran(ConnectionWrapper connWrapper) throws StoreException

Modified: qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java?rev=1667409&r1=1667408&r2=1667409&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java Tue Mar 17 21:19:05 2015
@@ -50,6 +50,7 @@ public abstract class AbstractDerbyMessa
         if (_messageStoreOpen.compareAndSet(false, true))
         {
             _parent = parent;
+            initMessageStore(parent);
 
             DerbyUtils.loadDerbyDriver();
 
@@ -85,7 +86,14 @@ public abstract class AbstractDerbyMessa
     {
         if (_messageStoreOpen.compareAndSet(true,  false))
         {
-            doClose();
+            try
+            {
+                doClose();
+            }
+            finally
+            {
+                super.closeMessageStore();
+            }
         }
     }
 

Modified: qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java?rev=1667409&r1=1667408&r2=1667409&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java Tue Mar 17 21:19:05 2015
@@ -76,6 +76,7 @@ public abstract class GenericAbstractJDB
             finally
             {
                 doClose();
+                super.closeMessageStore();
             }
 
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org