You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/06/16 22:14:41 UTC
svn commit: r1136645 - in
/activemq/activemq-apollo/trunk/apollo-openwire/src:
main/scala/org/apache/activemq/apollo/openwire/
test/scala/org/apache/activemq/apollo/openwire/
Author: tabish
Date: Thu Jun 16 20:14:40 2011
New Revision: 1136645
URL: http://svn.apache.org/viewvc?rev=1136645&view=rev
Log:
https://issues.apache.org/jira/browse/APLO-30
Ensure that ErrorResponse commands and ConnectionError commands are propogated back to the client.
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1136645&r1=1136644&r2=1136645&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Thu Jun 16 20:14:40 2011
@@ -304,27 +304,41 @@ class OpenwireProtocolHandler extends Pr
def fail[T](msg: String, actual:Command=null):T = {
def respond(command:Command) = {
if(command.isResponseRequired()) {
- val e = new ProtocolException(msg);
+ val e = new ProtocolException(msg)
e.fillInStackTrace
- val rc = new ExceptionResponse();
- rc.setCorrelationId(command.getCommandId());
+ val rc = new ExceptionResponse()
+ rc.setCorrelationId(command.getCommandId())
rc.setException(e)
- connection_session.offer(rc);
+ connection_session.offer(rc)
+ } else {
+ connection_error()
}
}
+ def connection_error() = {
+ val e = new ProtocolException(msg)
+ e.fillInStackTrace()
+
+ val err = new ConnectionError()
+ err.setException(e)
+
+ connection_session.offer(err)
+ }
(current_command,actual) match {
case (null, null)=>
+ connection_error()
case (null, command:Command)=>
+ respond(command)
case (command:Command, null)=>
+ connection_error()
case (command:Command, command2:Command)=>
respond(command)
}
throw new Break()
}
- def async_die(msg: String):Unit = try {
- die(msg)
+ def async_die(msg: String, actual:Command=null):Unit = try {
+ die(msg, actual)
} catch {
case x:Break=>
}
@@ -332,7 +346,7 @@ class OpenwireProtocolHandler extends Pr
/**
* A protocol error that cannot be recovered from. It results in the connections being terminated.
*/
- def die[T](msg: String):T = {
+ def die[T](msg: String, actual:Command=null):T = {
if (!dead) {
dead = true
debug("Shutting connection down due to: " + msg)
@@ -341,7 +355,7 @@ class OpenwireProtocolHandler extends Pr
queue.after(die_delay, TimeUnit.MILLISECONDS) {
connection.stop()
}
- fail(msg)
+ fail(msg, actual)
}
throw new Break()
}
@@ -414,10 +428,10 @@ class OpenwireProtocolHandler extends Pr
if( host.authenticator!=null && host.authorizer!=null ) {
suspendRead("authenticating and authorizing connect")
if( !host.authenticator.authenticate(security_context) ) {
- async_die("Authentication failed.")
+ async_die("Authentication failed.", info)
noop
} else if( !host.authorizer.can_connect_to(security_context, host, connection.connector) ) {
- async_die("Connect not authorized.")
+ async_die("Connect not authorized.", info)
noop
} else {
resumeRead
@@ -550,7 +564,7 @@ class OpenwireProtocolHandler extends Pr
val rc = host.router.connect(destiantion, route, security_context)
rc match {
case Some(failure) =>
- async_fail(failure, msg)
+ async_die(failure, msg)
case None =>
if (!connection.stopped) {
resumeRead
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala?rev=1136645&r1=1136644&r2=1136645&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala Thu Jun 16 20:14:40 2011
@@ -18,7 +18,7 @@ package org.apache.activemq.apollo.openw
import javax.jms.{TextMessage, Session}
-class ExclusiveConsumerTest extends OpenwireTestSupport {
+abstract class ExclusiveConsumerTest extends OpenwireTestSupport {
}
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala?rev=1136645&r1=1136644&r2=1136645&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala Thu Jun 16 20:14:40 2011
@@ -55,7 +55,8 @@ class OpenwireTestSupport extends FunSui
default_connection = null
}
- def create_connection_factory = new ActiveMQConnectionFactory("tcp://localhost:%d?wireFormat.maxInactivityDuration=1000000&wireFormat.maxInactivityDurationInitalDelay=1000000".format(port))
+ def connection_uri = "tcp://localhost:%d?wireFormat.maxInactivityDuration=1000000&wireFormat.maxInactivityDurationInitalDelay=1000000".format(port)
+ def create_connection_factory = new ActiveMQConnectionFactory(connection_uri)
def create_connection: Connection = create_connection_factory.createConnection
def queue(value:String) = new ActiveMQQueue(value);
def topic(value:String) = new ActiveMQTopic(value);
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala?rev=1136645&r1=1136644&r2=1136645&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala Thu Jun 16 20:14:40 2011
@@ -17,9 +17,9 @@
package org.apache.activemq.apollo.openwire
-import javax.jms.JMSException
+import javax.jms.{Session,JMSException,MessageProducer}
-class SecurityTest extends OpenwireTestSupport {
+abstract class SecurityTest extends OpenwireTestSupport {
override val broker_config_uri: String = "xml:classpath:apollo-openwire-secure.xml"
@@ -70,4 +70,151 @@ class ConnectionFailsWhenCredentialsAreI
connection.start()
}
}
-}
\ No newline at end of file
+}
+
+class ConnectionSucceedsWithValidCredentials extends SecurityTest {
+
+ override def connection_uri = super.connection_uri + "&jms.alwaysSyncSend=true"
+
+ test("Connect with valid id password that can connect") {
+
+ val factory = create_connection_factory
+ val connection = factory.createConnection("can_only_connect", "can_only_connect")
+
+ try {
+ connection.start()
+ } catch {
+ case e => fail("Should not have thrown an exception")
+ }
+
+ }
+}
+
+class SendFailsWhenNotAuthorized extends SecurityTest {
+ test("Send not authorized") {
+ val factory = create_connection_factory
+ val connection = factory.createConnection("can_only_connect", "can_only_connect")
+
+ try {
+ connection.start()
+ } catch {
+ case e => fail("Should not have thrown an exception")
+ }
+
+ val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val producer = session.createProducer(queue("secure"))
+
+ intercept[JMSException] {
+ producer.send(session.createTextMessage("Test Message"))
+ }
+ }
+}
+
+class SendFailsWhenNotAuthorizedToCreateQueues extends SecurityTest {
+
+ test("Send authorized but not create") {
+
+ val factory = create_connection_factory
+ val connection = factory.createConnection("can_send_queue", "can_send_queue")
+
+ try {
+ connection.start()
+ } catch {
+ case e => fail("Should not have thrown an exception")
+ }
+
+ val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val producer = session.createProducer(queue("secure"))
+
+ intercept[JMSException] {
+ producer.send(session.createTextMessage("Test Message"))
+ }
+ }
+}
+
+class ConsumeFailsWhenNotAuthroizedToCreateQueue extends SecurityTest {
+
+ test("Consume authorized but not create") {
+
+ val factory = create_connection_factory
+ val connection = factory.createConnection("can_consume_queue", "can_consume_queue")
+
+ try {
+ connection.start()
+ } catch {
+ case e => fail("Should not have thrown an exception")
+ }
+
+ val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ intercept[JMSException] {
+ val consumer = session.createConsumer(queue("secure"))
+ consumer.receive();
+ }
+ }
+}
+
+class SendSucceedsWhenCreateQueueAthorized extends SecurityTest {
+ test("Send and create authorized") {
+ val factory = create_connection_factory
+ val connection = factory.createConnection("can_send_create_queue", "can_send_create_queue")
+
+ try {
+ connection.start()
+ } catch {
+ case e => fail("Should not have thrown an exception")
+ }
+
+ val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val producer = session.createProducer(queue("secure"))
+
+ try {
+ producer.send(session.createTextMessage("Test Message"))
+ } catch {
+ case e => fail("Should not have thrown an exception")
+ }
+ }
+
+ test("Can send and once created") {
+
+ val factory = create_connection_factory
+ val connection = factory.createConnection("can_send_queue", "can_send_queue")
+
+ try {
+ connection.start()
+ } catch {
+ case e => fail("Should not have thrown an exception")
+ }
+
+ val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val producer = session.createProducer(queue("secure"))
+
+ try {
+ producer.send(session.createTextMessage("Test Message"))
+ } catch {
+ case e => fail("Should not have thrown an exception")
+ }
+ }
+}
+
+class SubscribeFailsForConnectionOnlyAuthorization extends SecurityTest {
+
+ test("Consume not authorized") {
+
+ val factory = create_connection_factory
+ val connection = factory.createConnection("can_only_connect", "can_only_connect")
+
+ try {
+ connection.start()
+ } catch {
+ case e => fail("Should not have thrown an exception")
+ }
+
+ val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ intercept[JMSException] {
+ val consumer = session.createConsumer(queue("secure"))
+ consumer.receive();
+ }
+ }
+}