You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/02/28 23:50:07 UTC
svn commit: r1075570 - in /activemq/activemq-apollo/trunk:
apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/
apollo-broker/src/test/scala/org/apache/activemq/apo...
Author: chirino
Date: Mon Feb 28 22:50:06 2011
New Revision: 1075570
URL: http://svn.apache.org/viewvc?rev=1075570&view=rev
Log:
Pick up hawtdispatch api updates.
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1075570&r1=1075569&r2=1075570&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala Mon Feb 28 22:50:06 2011
@@ -246,7 +246,7 @@ class BDBStore extends DelayingStoreSupp
}
}
- dispatch_queue.dispatchAfter(1, TimeUnit.SECONDS, ^{ displayStats })
+ dispatch_queue.executeAfter(1, TimeUnit.SECONDS, ^{ displayStats })
}
def get_store_status(callback:(StoreStatusDTO)=>Unit) = dispatch_queue {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1075570&r1=1075569&r2=1075570&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala Mon Feb 28 22:50:06 2011
@@ -287,7 +287,7 @@ trait DelayingStoreSupport extends Store
val batch_id = uow.uow_id
if( uow.delayable ) {
- dispatch_queue.dispatchAfter(flush_delay, TimeUnit.MILLISECONDS, ^{flush(batch_id)})
+ dispatch_queue.executeAfter(flush_delay, TimeUnit.MILLISECONDS, ^{flush(batch_id)})
} else {
flush(batch_id)
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala?rev=1075570&r1=1075569&r2=1075570&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/RemoteConnection.scala Mon Feb 28 22:50:06 2011
@@ -71,7 +71,7 @@ abstract class RemoteConnection extends
on_failure(error)
if (callbackWhenConnected != null) {
warn("connect attempt failed. will retry connection..")
- dispatch_queue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^ {
+ dispatch_queue.executeAfter(50, TimeUnit.MILLISECONDS, ^ {
if (stopping.get()) {
callbackWhenConnected.run
} else {
Modified: activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala?rev=1075570&r1=1075569&r2=1075570&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala Mon Feb 28 22:50:06 2011
@@ -164,7 +164,7 @@ class CassandraStore extends DelayingSto
schedualDisplayStats
}
}
- dispatch_queue.dispatchAfter(5, TimeUnit.SECONDS, ^{ displayStats })
+ dispatch_queue.executeAfter(5, TimeUnit.SECONDS, ^{ displayStats })
}
/**
Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala?rev=1075570&r1=1075569&r2=1075570&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala Mon Feb 28 22:50:06 2011
@@ -128,7 +128,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
func
} else {
info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000) + " seconds for the database to be unlocked.")
- dispatchQueue.dispatchAfter(DATABASE_LOCKED_WAIT_DELAY, TimeUnit.MILLISECONDS, ^ {
+ dispatchQueue.executeAfter(DATABASE_LOCKED_WAIT_DELAY, TimeUnit.MILLISECONDS, ^ {
hawtDBStore.executor_pool {
lock(func _)
}
Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala?rev=1075570&r1=1075569&r2=1075570&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala Mon Feb 28 22:50:06 2011
@@ -137,7 +137,7 @@ class HawtDBStore extends DelayingStoreS
}
}
}
- dispatch_queue.dispatchAfter(client.index_flush_interval, TimeUnit.MILLISECONDS, ^ {try_flush})
+ dispatch_queue.executeAfter(client.index_flush_interval, TimeUnit.MILLISECONDS, ^ {try_flush})
}
def scheduleCleanup(version:Int): Unit = {
@@ -149,7 +149,7 @@ class HawtDBStore extends DelayingStoreS
}
}
}
- dispatch_queue.dispatchAfter(client.cleanup_interval, TimeUnit.MILLISECONDS, ^ {try_cleanup})
+ dispatch_queue.executeAfter(client.cleanup_interval, TimeUnit.MILLISECONDS, ^ {try_cleanup})
}
protected def _stop(on_completed: Runnable) = {
@@ -264,7 +264,7 @@ class HawtDBStore extends DelayingStoreS
}
}
- dispatch_queue.dispatchAfter(1, TimeUnit.SECONDS, ^{ displayStats })
+ dispatch_queue.executeAfter(1, TimeUnit.SECONDS, ^{ displayStats })
}
def get_store_status(callback:(StoreStatusDTO)=>Unit) = dispatch_queue {
Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1075570&r1=1075569&r2=1075570&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala Mon Feb 28 22:50:06 2011
@@ -239,7 +239,7 @@ class JDBM2Store extends DelayingStoreSu
}
val interval = config.compact_interval.getOrElse(60)
if( interval>=0 ) {
- dispatch_queue.dispatchAfter(interval, TimeUnit.SECONDS, ^{ the_meat })
+ dispatch_queue.executeAfter(interval, TimeUnit.SECONDS, ^{ the_meat })
}
}
@@ -264,7 +264,7 @@ class JDBM2Store extends DelayingStoreSu
}
}
- dispatch_queue.dispatchAfter(1, TimeUnit.SECONDS, ^{ displayStats })
+ dispatch_queue.executeAfter(1, TimeUnit.SECONDS, ^{ displayStats })
}
def get_store_status(callback:(StoreStatusDTO)=>Unit) = dispatch_queue {
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala?rev=1075570&r1=1075569&r2=1075570&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala Mon Feb 28 22:50:06 2011
@@ -81,7 +81,7 @@ class StompRemoteConsumer extends Remote
protected def messageReceived() {
if (thinkTime > 0) {
transport.suspendRead
- dispatch_queue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
+ dispatch_queue.executeAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
rate.increment();
if (!stopped) {
transport.resumeRead
@@ -135,7 +135,7 @@ class StompRemoteProducer extends Remote
// if we are not going to wait for an ack back from the server,
// then jut send the next one...
if (thinkTime > 0) {
- dispatch_queue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
+ dispatch_queue.executeAfter(thinkTime, TimeUnit.MILLISECONDS, task)
} else {
dispatch_queue << task
}
@@ -184,7 +184,7 @@ trait Watchog extends RemoteConsumer {
def watchdog(lastMessageCount: Int): Unit = {
val seconds = 10
- dispatch_queue.dispatchAfter(seconds, TimeUnit.SECONDS, ^ {
+ dispatch_queue.executeAfter(seconds, TimeUnit.SECONDS, ^ {
if (messageCount == lastMessageCount) {
warn("Messages have stopped arriving after " + seconds + "s, stopping consumer")
stop
Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java?rev=1075570&r1=1075569&r2=1075570&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java Mon Feb 28 22:50:06 2011
@@ -383,7 +383,7 @@ public class TcpTransport extends JavaBa
readSource.resume();
} else if (socketState.is(CONNECTED.class) ) {
- dispatchQueue.dispatchAsync(new Runnable() {
+ dispatchQueue.execute(new Runnable() {
public void run() {
try {
trace("was connected.");
@@ -447,7 +447,7 @@ public class TcpTransport extends JavaBa
}
private void schedualRateAllowanceReset() {
- dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, new Runnable(){
+ dispatchQueue.executeAfter(1, TimeUnit.SECONDS, new Runnable(){
public void run() {
if( !socketState.is(CONNECTED.class) ) {
return;
Modified: activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java?rev=1075570&r1=1075569&r2=1075570&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/pipe/PipeTransport.java Mon Feb 28 22:50:06 2011
@@ -69,7 +69,7 @@ public class PipeTransport implements Tr
if (dispatchQueue == null) {
throw new IllegalArgumentException("dispatchQueue is not set");
}
- server.dispatchQueue.dispatchAsync(new Runnable(){
+ server.dispatchQueue.execute(new Runnable(){
public void run() {
dispatchSource = Dispatch.createSource(EventAggregators.linkedList(), dispatchQueue);
dispatchSource.setEventHandler(new Runnable(){
@@ -86,7 +86,7 @@ public class PipeTransport implements Tr
}
// let the peer know that they have been processed.
- peer.dispatchQueue.dispatchAsync(new Runnable() {
+ peer.dispatchQueue.execute(new Runnable() {
public void run() {
outbound -= commands.size();
drainInbound();
@@ -111,7 +111,7 @@ public class PipeTransport implements Tr
}
private void fireConnected() {
- dispatchQueue.dispatchAsync(new Runnable() {
+ dispatchQueue.execute(new Runnable() {
public void run() {
connected = true;
dispatchSource.resume();