You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/06/16 22:14:41 UTC

svn commit: r1136645 - in /activemq/activemq-apollo/trunk/apollo-openwire/src: main/scala/org/apache/activemq/apollo/openwire/ test/scala/org/apache/activemq/apollo/openwire/

Author: tabish
Date: Thu Jun 16 20:14:40 2011
New Revision: 1136645

URL: http://svn.apache.org/viewvc?rev=1136645&view=rev
Log:
https://issues.apache.org/jira/browse/APLO-30

Ensure that ErrorResponse commands and ConnectionError commands are propogated back to the client.

Modified:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1136645&r1=1136644&r2=1136645&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Thu Jun 16 20:14:40 2011
@@ -304,27 +304,41 @@ class OpenwireProtocolHandler extends Pr
   def fail[T](msg: String, actual:Command=null):T = {
     def respond(command:Command) = {
       if(command.isResponseRequired()) {
-        val e = new ProtocolException(msg);
+        val e = new ProtocolException(msg)
         e.fillInStackTrace
 
-        val rc = new ExceptionResponse();
-        rc.setCorrelationId(command.getCommandId());
+        val rc = new ExceptionResponse()
+        rc.setCorrelationId(command.getCommandId())
         rc.setException(e)
-        connection_session.offer(rc);
+        connection_session.offer(rc)
+      } else {
+        connection_error()
       }
     }
+    def connection_error() = {
+      val e = new ProtocolException(msg)
+      e.fillInStackTrace()
+
+      val err = new ConnectionError()
+      err.setException(e)
+
+      connection_session.offer(err)
+    }
     (current_command,actual) match {
        case (null, null)=>
+         connection_error()
        case (null, command:Command)=>
+         respond(command)
        case (command:Command, null)=>
+         connection_error()
        case (command:Command, command2:Command)=>
          respond(command)
     }
     throw new Break()
   }
 
-  def async_die(msg: String):Unit = try {
-    die(msg)
+  def async_die(msg: String, actual:Command=null):Unit = try {
+    die(msg, actual)
   } catch {
     case x:Break=>
   }
@@ -332,7 +346,7 @@ class OpenwireProtocolHandler extends Pr
   /**
    * A protocol error that cannot be recovered from. It results in the connections being terminated.
    */
-  def die[T](msg: String):T = {
+  def die[T](msg: String, actual:Command=null):T = {
     if (!dead) {
       dead = true
       debug("Shutting connection down due to: " + msg)
@@ -341,7 +355,7 @@ class OpenwireProtocolHandler extends Pr
       queue.after(die_delay, TimeUnit.MILLISECONDS) {
         connection.stop()
       }
-      fail(msg)
+      fail(msg, actual)
     }
     throw new Break()
   }
@@ -414,10 +428,10 @@ class OpenwireProtocolHandler extends Pr
         if( host.authenticator!=null &&  host.authorizer!=null ) {
           suspendRead("authenticating and authorizing connect")
           if( !host.authenticator.authenticate(security_context) ) {
-            async_die("Authentication failed.")
+            async_die("Authentication failed.", info)
             noop
           } else if( !host.authorizer.can_connect_to(security_context, host, connection.connector) ) {
-            async_die("Connect not authorized.")
+            async_die("Connect not authorized.", info)
             noop
           } else {
             resumeRead
@@ -550,7 +564,7 @@ class OpenwireProtocolHandler extends Pr
           val rc = host.router.connect(destiantion, route, security_context)
           rc match {
             case Some(failure) =>
-              async_fail(failure, msg)
+              async_die(failure, msg)
             case None =>
               if (!connection.stopped) {
                 resumeRead

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala?rev=1136645&r1=1136644&r2=1136645&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala Thu Jun 16 20:14:40 2011
@@ -18,7 +18,7 @@ package org.apache.activemq.apollo.openw
 
 import javax.jms.{TextMessage, Session}
 
-class ExclusiveConsumerTest extends OpenwireTestSupport {
+abstract class ExclusiveConsumerTest extends OpenwireTestSupport {
 
 }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala?rev=1136645&r1=1136644&r2=1136645&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala Thu Jun 16 20:14:40 2011
@@ -55,7 +55,8 @@ class OpenwireTestSupport extends FunSui
     default_connection = null
   }
 
-  def create_connection_factory = new ActiveMQConnectionFactory("tcp://localhost:%d?wireFormat.maxInactivityDuration=1000000&wireFormat.maxInactivityDurationInitalDelay=1000000".format(port))
+  def connection_uri = "tcp://localhost:%d?wireFormat.maxInactivityDuration=1000000&wireFormat.maxInactivityDurationInitalDelay=1000000".format(port)
+  def create_connection_factory = new ActiveMQConnectionFactory(connection_uri)
   def create_connection: Connection = create_connection_factory.createConnection
   def queue(value:String) = new ActiveMQQueue(value);
   def topic(value:String) = new ActiveMQTopic(value);

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala?rev=1136645&r1=1136644&r2=1136645&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala Thu Jun 16 20:14:40 2011
@@ -17,9 +17,9 @@
 
 package org.apache.activemq.apollo.openwire
 
-import javax.jms.JMSException
+import javax.jms.{Session,JMSException,MessageProducer}
 
-class SecurityTest extends OpenwireTestSupport {
+abstract class SecurityTest extends OpenwireTestSupport {
 
   override val broker_config_uri: String = "xml:classpath:apollo-openwire-secure.xml"
 
@@ -70,4 +70,151 @@ class ConnectionFailsWhenCredentialsAreI
       connection.start()
     }
   }
-}
\ No newline at end of file
+}
+
+class ConnectionSucceedsWithValidCredentials extends SecurityTest {
+
+  override def connection_uri = super.connection_uri + "&jms.alwaysSyncSend=true"
+  
+  test("Connect with valid id password that can connect") {
+
+    val factory = create_connection_factory
+    val connection = factory.createConnection("can_only_connect", "can_only_connect")
+
+    try {
+      connection.start()
+    } catch {
+      case e => fail("Should not have thrown an exception")
+    }
+
+  }
+}
+
+class SendFailsWhenNotAuthorized extends SecurityTest {
+  test("Send not authorized") {
+    val factory = create_connection_factory
+    val connection = factory.createConnection("can_only_connect", "can_only_connect")
+
+    try {
+      connection.start()
+    } catch {
+      case e => fail("Should not have thrown an exception")
+    }
+
+    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val producer = session.createProducer(queue("secure"))
+
+    intercept[JMSException] {
+      producer.send(session.createTextMessage("Test Message"))
+    }
+  }
+}
+
+class SendFailsWhenNotAuthorizedToCreateQueues extends SecurityTest {
+
+  test("Send authorized but not create") {
+
+    val factory = create_connection_factory
+    val connection = factory.createConnection("can_send_queue", "can_send_queue")
+
+    try {
+      connection.start()
+    } catch {
+      case e => fail("Should not have thrown an exception")
+    }
+
+    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val producer = session.createProducer(queue("secure"))
+
+    intercept[JMSException] {
+      producer.send(session.createTextMessage("Test Message"))
+    }
+  }
+}
+
+class ConsumeFailsWhenNotAuthroizedToCreateQueue extends SecurityTest {
+
+  test("Consume authorized but not create") {
+
+    val factory = create_connection_factory
+    val connection = factory.createConnection("can_consume_queue", "can_consume_queue")
+
+    try {
+      connection.start()
+    } catch {
+      case e => fail("Should not have thrown an exception")
+    }
+
+    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    intercept[JMSException] {
+      val consumer = session.createConsumer(queue("secure"))
+      consumer.receive();
+    }
+  }
+}
+
+class SendSucceedsWhenCreateQueueAthorized extends SecurityTest {
+  test("Send and create authorized") {
+    val factory = create_connection_factory
+    val connection = factory.createConnection("can_send_create_queue", "can_send_create_queue")
+
+    try {
+      connection.start()
+    } catch {
+      case e => fail("Should not have thrown an exception")
+    }
+
+    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val producer = session.createProducer(queue("secure"))
+
+    try {
+      producer.send(session.createTextMessage("Test Message"))
+    } catch {
+      case e => fail("Should not have thrown an exception")
+    }
+  }
+
+  test("Can send and once created") {
+
+    val factory = create_connection_factory
+    val connection = factory.createConnection("can_send_queue", "can_send_queue")
+
+    try {
+      connection.start()
+    } catch {
+      case e => fail("Should not have thrown an exception")
+    }
+
+    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val producer = session.createProducer(queue("secure"))
+
+    try {
+      producer.send(session.createTextMessage("Test Message"))
+    } catch {
+      case e => fail("Should not have thrown an exception")
+    }
+  }
+}
+
+class SubscribeFailsForConnectionOnlyAuthorization extends SecurityTest {
+
+  test("Consume not authorized") {
+
+    val factory = create_connection_factory
+    val connection = factory.createConnection("can_only_connect", "can_only_connect")
+
+    try {
+      connection.start()
+    } catch {
+      case e => fail("Should not have thrown an exception")
+    }
+
+    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    intercept[JMSException] {
+      val consumer = session.createConsumer(queue("secure"))
+      consumer.receive();
+    }    
+  }
+}