You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/04/03 18:46:02 UTC

svn commit: r1464099 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ apollo-broker/src/test/scala/ apollo-stomp/src/main/scal...

Author: chirino
Date: Wed Apr  3 16:46:02 2013
New Revision: 1464099

URL: http://svn.apache.org/r1464099
Log:
protect against APLO-315: Added a test case and protection against 'evil' clients that send many request /w receipt frames but that don't read their sockets for responses.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1464099&r1=1464098&r2=1464099&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Apr  3 16:46:02 2013
@@ -23,7 +23,7 @@ import protocol.{ProtocolHandler}
 import org.apache.activemq.apollo.filter.BooleanExpression
 import org.fusesource.hawtdispatch.transport._
 import org.apache.activemq.apollo.dto._
-import org.apache.activemq.apollo.util.{Dispatched, Log, BaseService}
+import org.apache.activemq.apollo.util.{DeferringDispatched, Dispatched, Log, BaseService}
 import scala.Some
 import java.security.cert.X509Certificate
 
@@ -34,7 +34,7 @@ object Connection extends Log
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-abstract class Connection() extends BaseService with Dispatched {
+abstract class Connection() extends BaseService with DeferringDispatched {
   import Connection._
 
   private var _dispatch_queue = createQueue()

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala?rev=1464099&r1=1464098&r2=1464099&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala Wed Apr  3 16:46:02 2013
@@ -87,6 +87,7 @@ trait ProtocolHandler {
   def session_id:Option[String]
 
   var connection:BrokerConnection = null;
+  def defer(func: =>Unit) = connection.defer(func)
 
   def set_connection(brokerConnection:BrokerConnection) = {
     this.connection = brokerConnection

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1464099&r1=1464098&r2=1464099&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala Wed Apr  3 16:46:02 2013
@@ -31,6 +31,7 @@ import org.fusesource.hawtbuf.{ByteArray
 import com.fasterxml.jackson.annotation.JsonInclude
 import util.concurrent.CountDownLatch
 import java.util.concurrent.locks.{ReentrantReadWriteLock, Lock, ReadWriteLock}
+import java.util.concurrent.atomic.AtomicBoolean
 
 object BrokerTestSupport {
   import FutureResult._
@@ -169,6 +170,24 @@ class BrokerFunSuiteSupport extends FunS
     BrokerFactory.createBroker(broker_config_uri, props)
   }
 
+  abstract case class BlockingTask(done:AtomicBoolean = new AtomicBoolean(false), shutdown:CountDownLatch=new CountDownLatch(1)) extends Task {
+
+    def stop = {
+      done.set(true)
+      this
+    }
+
+    def await = shutdown.await()
+
+    Broker.BLOCKABLE_THREAD_POOL {
+      try {
+        this.run();
+      } finally {
+        shutdown.countDown();
+      }
+    }
+  }
+
   override def beforeAll() = {
     super.beforeAll()
     if( before_and_after_all_object==null ) {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1464099&r1=1464098&r2=1464099&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Apr  3 16:46:02 2013
@@ -547,7 +547,7 @@ class StompProtocolHandler extends Proto
       override def time_stamp = broker.now
     }
 
-    override def dispose() = dispatchQueue {
+    override def dispose() = defer {
       ack_handler.close
       credit_window_filter.disable
       sink_manager.close(consumer_sink, (frame)=>{
@@ -669,7 +669,8 @@ class StompProtocolHandler extends Proto
 
 //  var session_manager:SessionSinkMux[StompFrame] = null
   var sink_manager:SinkMux[StompFrame] = null
-  var connection_sink:Sink[StompFrame] = null
+  var connection_sink:OverflowSink[StompFrame] = null
+  var connection_sink_read_suspended = false
 
   var dead = false
   var closed = false
@@ -840,6 +841,13 @@ class StompProtocolHandler extends Proto
 
     sink_manager = new SinkMux[StompFrame](filtering_sink)
     connection_sink = new OverflowSink(sink_manager.open());
+    connection_sink.refiller =  ^ {
+      if( connection_sink_read_suspended ) {
+        connection_sink_read_suspended = false
+        println("connection_sink: resume_read")
+        resume_read()
+      }
+    }
     resume_read
   }
 
@@ -1096,7 +1104,7 @@ class StompProtocolHandler extends Proto
         case None=> broker.default_virtual_host
         case Some(host)=> broker.get_virtual_host(host)
       }
-      dispatchQueue {
+      defer {
         resume_read
         if(host==null) {
           async_die("Invalid virtual host: "+host_header.get)
@@ -1111,7 +1119,7 @@ class StompProtocolHandler extends Proto
           if( host.authenticator!=null &&  host.authorizer!=null ) {
             suspend_read("authenticating and authorizing connect")
             host.authenticator.authenticate(security_context) { auth_failure=>
-              dispatchQueue {
+              defer {
                 if( auth_failure!=null ) {
                   async_die("%s. Credentials=%s".format(auth_failure, security_context.credential_dump))
                 } else if( !host.authorizer.can(security_context, "connect", connection.connector) ) {
@@ -1227,7 +1235,7 @@ class StompProtocolHandler extends Proto
     }
   }
 
-  def producer_maintenance = dispatchQueue {
+  def producer_maintenance = defer {
     val now = Broker.now
     import collection.JavaConversions._
     val expired = ListBuffer[StompProducerRoute]()
@@ -1260,7 +1268,7 @@ class StompProtocolHandler extends Proto
         }
         host.dispatch_queue {
           val rc = host.router.connect(route.addresses, route, security_context)
-          dispatchQueue {
+          defer {
             rc match {
               case Some(failure) =>
                 async_die(failure)
@@ -1411,8 +1419,8 @@ class StompProtocolHandler extends Proto
       if( receipt!=null ) {
         val trimmed_receipt = receipt.deepCopy().ascii()
         delivery.ack = { (consumed, uow) =>
-          dispatchQueue <<| ^{
-            connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, trimmed_receipt))))
+          defer {
+            send_receipt(trimmed_receipt)
           }
         }
       }
@@ -1430,7 +1438,7 @@ class StompProtocolHandler extends Proto
     } else {
       // info("Dropping message.  No consumers interested in message.")
       if( receipt!=null ) {
-        connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
+        send_receipt(receipt)
       }
     }
     frame.release
@@ -1531,7 +1539,7 @@ class StompProtocolHandler extends Proto
 
     host.dispatch_queue {
       host.router.bind(addresses, consumer, security_context) { rc =>
-        dispatchQueue {
+        defer {
           rc match {
             case Some(reason)=>
               consumers -= id
@@ -1573,11 +1581,13 @@ class StompProtocolHandler extends Proto
             var addresses = Array[DestinationAddress](SubscriptionAddress(destination_parser.decode_path(decode_header(id)), null, Array[BindAddress]()))
             host.router.delete(addresses, security_context) match {
               case Some(error)=>
-                dispatchQueue {
+                defer {
                   async_die(error)
                 }
               case None =>
-                send_receipt(headers)
+                defer {
+                  send_receipt(headers)
+                }
             }
           }
         } else {
@@ -1590,7 +1600,9 @@ class StompProtocolHandler extends Proto
         host.dispatch_queue {
           host.router.unbind(consumer.addresses, consumer, persistent, security_context)
           consumer.release()
-          send_receipt(headers)
+          defer {
+            send_receipt(headers)
+          }
         }
     }
   }
@@ -1684,20 +1696,28 @@ class StompProtocolHandler extends Proto
     send_receipt(headers)
   }
 
-
-  def send_receipt(headers:HeaderMap) = {
+  def send_receipt(headers:HeaderMap):StompFrame = {
     get(headers, RECEIPT_REQUESTED) match {
       case Some(receipt)=>
-        val frame = StompFrame(RECEIPT, List((RECEIPT_ID, receipt)))
-        dispatchQueue <<| ^{
-          connection_sink.offer(frame)
-        }
-        frame
+        send_receipt(receipt)
       case None=>
         null
     }
   }
 
+  def send_receipt(receipt:AsciiBuffer):StompFrame = {
+    dispatchQueue.assertExecuting()
+    val frame = StompFrame(RECEIPT, List((RECEIPT_ID, receipt)))
+    connection_sink.offer(frame)
+    if( connection_sink.overflow.size() > 1000 && !connection_sink_read_suspended) {
+      connection_sink_read_suspended = true
+      println("connection_sink: suspend_read")
+      suspend_read("client to drain receipts")
+    }
+    frame
+  }
+
+
   class TransactionQueue {
     // TODO: eventually we want to back this /w a broker Queue which
     // can provides persistence and memory swapping.
@@ -1714,7 +1734,9 @@ class StompProtocolHandler extends Proto
 //        println("UOW starting: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
         uow.on_complete {
 //          println("UOW completed: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
-          on_complete
+          defer {
+            on_complete
+          }
         }
         queue.foreach{ _._1(uow) }
         uow.release

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1464099&r1=1464098&r2=1464099&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Wed Apr  3 16:46:02 2013
@@ -24,7 +24,8 @@ import java.net.{SocketTimeoutException,
 import org.apache.activemq.apollo.stomp.{Stomp, StompProtocolHandler}
 import org.fusesource.hawtdispatch._
 import collection.mutable
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger}
+import java.util.concurrent.{CountDownLatch, ConcurrentHashMap}
 
 /**
  * These tests can be run in parallel against a single Apollo broker.
@@ -1735,4 +1736,66 @@ class StompParallelTest extends StompTes
     }
   }
 
+  test("APLO-315: Evil producers that don't read thier sockets for receipts") {
+    val pending = new ConcurrentHashMap[String, java.lang.Long]()
+    val start = System.currentTimeMillis();
+    val producer_counter = new AtomicLong(0);
+
+    var producer_done = new AtomicBoolean(false)
+    var producer_shutdown = new CountDownLatch(15*2);
+
+    val client = connect("1.1", new StompClient());
+
+    // use one thread to send..
+    val producer = new BlockingTask() {
+      def run() {
+        var i = 0;
+        while(!done.get()) {
+          val receipt_id = "-"+i;
+          i += 1
+          pending.put(receipt_id, new java.lang.Long(System.nanoTime()));
+          async_send("/topic/APLO-315", "This is message "+i, headers="receipt:"+receipt_id+"\npersistent:true\n", c=client);
+          producer_counter.incrementAndGet();
+        }
+      }
+    }
+
+    // The producer should block since he's not draining his receipts..
+    within(10, SECONDS) {
+      val start = producer_counter.get()
+      Thread.sleep(1000)
+      producer_counter.get() should be (start)
+    }
+
+    producer.stop
+
+    // Start draining those receipts..
+    val drainer = new BlockingTask() {
+      def run() {
+        var i = 0;
+        while(!pending.isEmpty) {
+          if( done.get() ) {
+            return;
+          }
+          try {
+            var r = wait_for_receipt(c = client, timeout = 500);
+            if (r != null) {
+              val start = pending.remove(r);
+              if (start == null) {
+                fail("Got unexpected receipt: " + r)
+              }
+              val latency = System.nanoTime() - start.longValue();
+            }
+          } catch {
+            case e:SocketTimeoutException =>
+          }
+        }
+      }
+    }
+
+    producer.await
+    drainer.await
+  }
+
+
 }

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala?rev=1464099&r1=1464098&r2=1464099&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala Wed Apr  3 16:46:02 2013
@@ -40,7 +40,10 @@ trait DeferringDispatched extends Dispat
     })
   }
 
-  val dispatch_queue_task_source = createSource(new ListEventAggregator[Task](), dispatch_queue)
-  dispatch_queue_task_source.setEventHandler(^{ dispatch_queue_task_source.getData.foreach(_.run()) });
-  dispatch_queue_task_source.resume()
+  lazy val dispatch_queue_task_source = {
+    val x = createSource(new ListEventAggregator[Task](), dispatch_queue)
+    x.setEventHandler(^{ x.getData.foreach(_.run()) });
+    x.resume()
+    x
+  }
 }
\ No newline at end of file