You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/03/17 22:19:05 UTC
svn commit: r1667409 - in /qpid/trunk/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/store/
broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/
broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/j...
Author: rgodfrey
Date: Tue Mar 17 21:19:05 2015
New Revision: 1667409
URL: http://svn.apache.org/r1667409
Log:
QPID-6457 : [Java Broker] Make asynchronous commits occur on executor threads
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1667409&r1=1667408&r2=1667409&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Tue Mar 17 21:19:05 2015
@@ -35,6 +35,17 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
@@ -109,11 +120,16 @@ public abstract class AbstractJDBCMessag
" WHERE format = ? and global_id = ? and branch_id = ?";
protected final EventManager _eventManager = new EventManager();
+ private ConfiguredObject<?> _parent;
protected abstract boolean isMessageStoreOpen();
protected abstract void checkMessageStoreOpen();
+ private ScheduledThreadPoolExecutor _executor;
+ public AbstractJDBCMessageStore()
+ {
+ }
protected void setMaximumMessageId()
{
@@ -269,6 +285,34 @@ public abstract class AbstractJDBCMessag
}
}
+ protected void initMessageStore(final ConfiguredObject<?> parent)
+ {
+ _parent = parent;
+ _executor = new ScheduledThreadPoolExecutor(4, new ThreadFactory()
+ {
+ private final AtomicInteger _count = new AtomicInteger();
+ @Override
+ public Thread newThread(final Runnable r)
+ {
+ final Thread thread = Executors.defaultThreadFactory().newThread(r);
+ thread.setName(parent.getName() + "-store-"+_count.incrementAndGet());
+ return thread;
+ }
+ });
+ _executor.prestartAllCoreThreads();
+
+ }
+
+ @Override
+ public void closeMessageStore()
+ {
+ if(_executor != null)
+ {
+ _executor.shutdown();
+ }
+
+ }
+
protected abstract Logger getLogger();
protected abstract String getSqlBlobType();
@@ -835,10 +879,105 @@ public abstract class AbstractJDBCMessag
}
}
- private FutureResult commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
+ private FutureResult commitTranAsync(final ConnectionWrapper connWrapper) throws StoreException
{
- commitTran(connWrapper);
- return FutureResult.IMMEDIATE_FUTURE;
+ final Future<?> result = _executor.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ commitTran(connWrapper);
+ }
+ });
+ return new FutureResult()
+ {
+ @Override
+ public boolean isComplete()
+ {
+ boolean done = result.isDone();
+ try
+ {
+ result.get();
+ }
+ catch (InterruptedException e)
+ {
+ // this won't happen as we're actually already done;
+ }
+ catch (ExecutionException e)
+ {
+ if(e.getCause() instanceof RuntimeException)
+ {
+ throw (RuntimeException)e.getCause();
+ }
+ else if(e.getCause() instanceof Error)
+ {
+ throw (Error)e.getCause();
+ }
+ else
+ {
+ throw new StoreException(e);
+ }
+ }
+ return done;
+ }
+
+ @Override
+ public void waitForCompletion()
+ {
+ try
+ {
+ result.get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new StoreException(e);
+ }
+ catch (ExecutionException e)
+ {
+ if(e.getCause() instanceof RuntimeException)
+ {
+ throw (RuntimeException)e.getCause();
+ }
+ else if(e.getCause() instanceof Error)
+ {
+ throw (Error)e.getCause();
+ }
+ else
+ {
+ throw new StoreException(e);
+ }
+ }
+ }
+
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+
+ try
+ {
+ result.get(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new StoreException(e);
+ }
+ catch (ExecutionException e)
+ {
+ if(e.getCause() instanceof RuntimeException)
+ {
+ throw (RuntimeException)e.getCause();
+ }
+ else if(e.getCause() instanceof Error)
+ {
+ throw (Error)e.getCause();
+ }
+ else
+ {
+ throw new StoreException(e);
+ }
+ }
+ }
+ };
}
private void abortTran(ConnectionWrapper connWrapper) throws StoreException
Modified: qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java?rev=1667409&r1=1667408&r2=1667409&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java Tue Mar 17 21:19:05 2015
@@ -50,6 +50,7 @@ public abstract class AbstractDerbyMessa
if (_messageStoreOpen.compareAndSet(false, true))
{
_parent = parent;
+ initMessageStore(parent);
DerbyUtils.loadDerbyDriver();
@@ -85,7 +86,14 @@ public abstract class AbstractDerbyMessa
{
if (_messageStoreOpen.compareAndSet(true, false))
{
- doClose();
+ try
+ {
+ doClose();
+ }
+ finally
+ {
+ super.closeMessageStore();
+ }
}
}
Modified: qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java?rev=1667409&r1=1667408&r2=1667409&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java Tue Mar 17 21:19:05 2015
@@ -76,6 +76,7 @@ public abstract class GenericAbstractJDB
finally
{
doClose();
+ super.closeMessageStore();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org