You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/04/03 18:46:02 UTC
svn commit: r1464099 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/
apollo-broker/src/test/scala/ apollo-stomp/src/main/scal...
Author: chirino
Date: Wed Apr 3 16:46:02 2013
New Revision: 1464099
URL: http://svn.apache.org/r1464099
Log:
protect against APLO-315: Added a test case and protection against 'evil' clients that send many request /w receipt frames but that don't read their sockets for responses.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1464099&r1=1464098&r2=1464099&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Apr 3 16:46:02 2013
@@ -23,7 +23,7 @@ import protocol.{ProtocolHandler}
import org.apache.activemq.apollo.filter.BooleanExpression
import org.fusesource.hawtdispatch.transport._
import org.apache.activemq.apollo.dto._
-import org.apache.activemq.apollo.util.{Dispatched, Log, BaseService}
+import org.apache.activemq.apollo.util.{DeferringDispatched, Dispatched, Log, BaseService}
import scala.Some
import java.security.cert.X509Certificate
@@ -34,7 +34,7 @@ object Connection extends Log
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-abstract class Connection() extends BaseService with Dispatched {
+abstract class Connection() extends BaseService with DeferringDispatched {
import Connection._
private var _dispatch_queue = createQueue()
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala?rev=1464099&r1=1464098&r2=1464099&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala Wed Apr 3 16:46:02 2013
@@ -87,6 +87,7 @@ trait ProtocolHandler {
def session_id:Option[String]
var connection:BrokerConnection = null;
+ def defer(func: =>Unit) = connection.defer(func)
def set_connection(brokerConnection:BrokerConnection) = {
this.connection = brokerConnection
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1464099&r1=1464098&r2=1464099&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala Wed Apr 3 16:46:02 2013
@@ -31,6 +31,7 @@ import org.fusesource.hawtbuf.{ByteArray
import com.fasterxml.jackson.annotation.JsonInclude
import util.concurrent.CountDownLatch
import java.util.concurrent.locks.{ReentrantReadWriteLock, Lock, ReadWriteLock}
+import java.util.concurrent.atomic.AtomicBoolean
object BrokerTestSupport {
import FutureResult._
@@ -169,6 +170,24 @@ class BrokerFunSuiteSupport extends FunS
BrokerFactory.createBroker(broker_config_uri, props)
}
+ abstract case class BlockingTask(done:AtomicBoolean = new AtomicBoolean(false), shutdown:CountDownLatch=new CountDownLatch(1)) extends Task {
+
+ def stop = {
+ done.set(true)
+ this
+ }
+
+ def await = shutdown.await()
+
+ Broker.BLOCKABLE_THREAD_POOL {
+ try {
+ this.run();
+ } finally {
+ shutdown.countDown();
+ }
+ }
+ }
+
override def beforeAll() = {
super.beforeAll()
if( before_and_after_all_object==null ) {
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1464099&r1=1464098&r2=1464099&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Apr 3 16:46:02 2013
@@ -547,7 +547,7 @@ class StompProtocolHandler extends Proto
override def time_stamp = broker.now
}
- override def dispose() = dispatchQueue {
+ override def dispose() = defer {
ack_handler.close
credit_window_filter.disable
sink_manager.close(consumer_sink, (frame)=>{
@@ -669,7 +669,8 @@ class StompProtocolHandler extends Proto
// var session_manager:SessionSinkMux[StompFrame] = null
var sink_manager:SinkMux[StompFrame] = null
- var connection_sink:Sink[StompFrame] = null
+ var connection_sink:OverflowSink[StompFrame] = null
+ var connection_sink_read_suspended = false
var dead = false
var closed = false
@@ -840,6 +841,13 @@ class StompProtocolHandler extends Proto
sink_manager = new SinkMux[StompFrame](filtering_sink)
connection_sink = new OverflowSink(sink_manager.open());
+ connection_sink.refiller = ^ {
+ if( connection_sink_read_suspended ) {
+ connection_sink_read_suspended = false
+ println("connection_sink: resume_read")
+ resume_read()
+ }
+ }
resume_read
}
@@ -1096,7 +1104,7 @@ class StompProtocolHandler extends Proto
case None=> broker.default_virtual_host
case Some(host)=> broker.get_virtual_host(host)
}
- dispatchQueue {
+ defer {
resume_read
if(host==null) {
async_die("Invalid virtual host: "+host_header.get)
@@ -1111,7 +1119,7 @@ class StompProtocolHandler extends Proto
if( host.authenticator!=null && host.authorizer!=null ) {
suspend_read("authenticating and authorizing connect")
host.authenticator.authenticate(security_context) { auth_failure=>
- dispatchQueue {
+ defer {
if( auth_failure!=null ) {
async_die("%s. Credentials=%s".format(auth_failure, security_context.credential_dump))
} else if( !host.authorizer.can(security_context, "connect", connection.connector) ) {
@@ -1227,7 +1235,7 @@ class StompProtocolHandler extends Proto
}
}
- def producer_maintenance = dispatchQueue {
+ def producer_maintenance = defer {
val now = Broker.now
import collection.JavaConversions._
val expired = ListBuffer[StompProducerRoute]()
@@ -1260,7 +1268,7 @@ class StompProtocolHandler extends Proto
}
host.dispatch_queue {
val rc = host.router.connect(route.addresses, route, security_context)
- dispatchQueue {
+ defer {
rc match {
case Some(failure) =>
async_die(failure)
@@ -1411,8 +1419,8 @@ class StompProtocolHandler extends Proto
if( receipt!=null ) {
val trimmed_receipt = receipt.deepCopy().ascii()
delivery.ack = { (consumed, uow) =>
- dispatchQueue <<| ^{
- connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, trimmed_receipt))))
+ defer {
+ send_receipt(trimmed_receipt)
}
}
}
@@ -1430,7 +1438,7 @@ class StompProtocolHandler extends Proto
} else {
// info("Dropping message. No consumers interested in message.")
if( receipt!=null ) {
- connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
+ send_receipt(receipt)
}
}
frame.release
@@ -1531,7 +1539,7 @@ class StompProtocolHandler extends Proto
host.dispatch_queue {
host.router.bind(addresses, consumer, security_context) { rc =>
- dispatchQueue {
+ defer {
rc match {
case Some(reason)=>
consumers -= id
@@ -1573,11 +1581,13 @@ class StompProtocolHandler extends Proto
var addresses = Array[DestinationAddress](SubscriptionAddress(destination_parser.decode_path(decode_header(id)), null, Array[BindAddress]()))
host.router.delete(addresses, security_context) match {
case Some(error)=>
- dispatchQueue {
+ defer {
async_die(error)
}
case None =>
- send_receipt(headers)
+ defer {
+ send_receipt(headers)
+ }
}
}
} else {
@@ -1590,7 +1600,9 @@ class StompProtocolHandler extends Proto
host.dispatch_queue {
host.router.unbind(consumer.addresses, consumer, persistent, security_context)
consumer.release()
- send_receipt(headers)
+ defer {
+ send_receipt(headers)
+ }
}
}
}
@@ -1684,20 +1696,28 @@ class StompProtocolHandler extends Proto
send_receipt(headers)
}
-
- def send_receipt(headers:HeaderMap) = {
+ def send_receipt(headers:HeaderMap):StompFrame = {
get(headers, RECEIPT_REQUESTED) match {
case Some(receipt)=>
- val frame = StompFrame(RECEIPT, List((RECEIPT_ID, receipt)))
- dispatchQueue <<| ^{
- connection_sink.offer(frame)
- }
- frame
+ send_receipt(receipt)
case None=>
null
}
}
+ def send_receipt(receipt:AsciiBuffer):StompFrame = {
+ dispatchQueue.assertExecuting()
+ val frame = StompFrame(RECEIPT, List((RECEIPT_ID, receipt)))
+ connection_sink.offer(frame)
+ if( connection_sink.overflow.size() > 1000 && !connection_sink_read_suspended) {
+ connection_sink_read_suspended = true
+ println("connection_sink: suspend_read")
+ suspend_read("client to drain receipts")
+ }
+ frame
+ }
+
+
class TransactionQueue {
// TODO: eventually we want to back this /w a broker Queue which
// can provides persistence and memory swapping.
@@ -1714,7 +1734,9 @@ class StompProtocolHandler extends Proto
// println("UOW starting: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
uow.on_complete {
// println("UOW completed: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
- on_complete
+ defer {
+ on_complete
+ }
}
queue.foreach{ _._1(uow) }
uow.release
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1464099&r1=1464098&r2=1464099&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Wed Apr 3 16:46:02 2013
@@ -24,7 +24,8 @@ import java.net.{SocketTimeoutException,
import org.apache.activemq.apollo.stomp.{Stomp, StompProtocolHandler}
import org.fusesource.hawtdispatch._
import collection.mutable
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger}
+import java.util.concurrent.{CountDownLatch, ConcurrentHashMap}
/**
* These tests can be run in parallel against a single Apollo broker.
@@ -1735,4 +1736,66 @@ class StompParallelTest extends StompTes
}
}
+ test("APLO-315: Evil producers that don't read thier sockets for receipts") {
+ val pending = new ConcurrentHashMap[String, java.lang.Long]()
+ val start = System.currentTimeMillis();
+ val producer_counter = new AtomicLong(0);
+
+ var producer_done = new AtomicBoolean(false)
+ var producer_shutdown = new CountDownLatch(15*2);
+
+ val client = connect("1.1", new StompClient());
+
+ // use one thread to send..
+ val producer = new BlockingTask() {
+ def run() {
+ var i = 0;
+ while(!done.get()) {
+ val receipt_id = "-"+i;
+ i += 1
+ pending.put(receipt_id, new java.lang.Long(System.nanoTime()));
+ async_send("/topic/APLO-315", "This is message "+i, headers="receipt:"+receipt_id+"\npersistent:true\n", c=client);
+ producer_counter.incrementAndGet();
+ }
+ }
+ }
+
+ // The producer should block since he's not draining his receipts..
+ within(10, SECONDS) {
+ val start = producer_counter.get()
+ Thread.sleep(1000)
+ producer_counter.get() should be (start)
+ }
+
+ producer.stop
+
+ // Start draining those receipts..
+ val drainer = new BlockingTask() {
+ def run() {
+ var i = 0;
+ while(!pending.isEmpty) {
+ if( done.get() ) {
+ return;
+ }
+ try {
+ var r = wait_for_receipt(c = client, timeout = 500);
+ if (r != null) {
+ val start = pending.remove(r);
+ if (start == null) {
+ fail("Got unexpected receipt: " + r)
+ }
+ val latency = System.nanoTime() - start.longValue();
+ }
+ } catch {
+ case e:SocketTimeoutException =>
+ }
+ }
+ }
+ }
+
+ producer.await
+ drainer.await
+ }
+
+
}
Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala?rev=1464099&r1=1464098&r2=1464099&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala Wed Apr 3 16:46:02 2013
@@ -40,7 +40,10 @@ trait DeferringDispatched extends Dispat
})
}
- val dispatch_queue_task_source = createSource(new ListEventAggregator[Task](), dispatch_queue)
- dispatch_queue_task_source.setEventHandler(^{ dispatch_queue_task_source.getData.foreach(_.run()) });
- dispatch_queue_task_source.resume()
+ lazy val dispatch_queue_task_source = {
+ val x = createSource(new ListEventAggregator[Task](), dispatch_queue)
+ x.setEventHandler(^{ x.getData.foreach(_.run()) });
+ x.resume()
+ x
+ }
}
\ No newline at end of file