You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/06/15 23:26:06 UTC
svn commit: r1136199 - in
/activemq/activemq-apollo/trunk/apollo-openwire/src: main/resouces/
main/resources/org/ main/resources/org/apache/
main/resources/org/apache/activemq/
main/resources/org/apache/activemq/apollo/
main/resources/org/apache/active...
Author: tabish
Date: Wed Jun 15 21:26:06 2011
New Revision: 1136199
URL: http://svn.apache.org/viewvc?rev=1136199&view=rev
Log:
https://issues.apache.org/jira/browse/APLO-30
Add some initial support for config and connection authentication and authorization.
Added:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/org/
activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/org/apache/
activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/org/apache/activemq/
activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/org/apache/activemq/apollo/
activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/org/apache/activemq/apollo/openwire/
activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/org/apache/activemq/apollo/openwire/dto/
activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/org/apache/activemq/apollo/openwire/dto/jaxb.index
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/dto/OpenwireDTO.java
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-bdb.xml
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-custom-dest-delimiters.xml
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-secure.xml
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl-secure.xml
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl.xml
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo.ks (with props)
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/client.ks (with props)
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/login.config
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/users.properties
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala
Removed:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/resouces/
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/org/apache/activemq/apollo/openwire/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/org/apache/activemq/apollo/openwire/dto/jaxb.index?rev=1136199&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/org/apache/activemq/apollo/openwire/dto/jaxb.index (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/org/apache/activemq/apollo/openwire/dto/jaxb.index Wed Jun 15 21:26:06 2011
@@ -0,0 +1,18 @@
+# ------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ------------------------------------------------------------------------
+OpenwireDTO
+OpenwireConnectionStatusDTO
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1136199&r1=1136198&r2=1136199&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed Jun 15 21:26:06 2011
@@ -39,7 +39,7 @@ import security.SecurityContext
import tcp.TcpTransport
import codec.OpenWireFormat
import command._
-import org.apache.activemq.apollo.openwire.dto.OpenwireConnectionStatusDTO
+import org.apache.activemq.apollo.openwire.dto.{OpenwireConnectionStatusDTO,OpenwireDTO}
import org.apache.activemq.apollo.dto.{TopicDestinationDTO, DurableSubscriptionDestinationDTO, DestinationDTO}
object OpenwireProtocolHandler extends Log {
@@ -60,14 +60,9 @@ object OpenwireProtocolHandler extends L
preferred_wireformat_settings.setMaxFrameSize(OpenWireFormat.DEFAULT_MAX_FRAME_SIZE);
}
-
/**
- * <p>
- * </p>
*
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-
class OpenwireProtocolHandler extends ProtocolHandler {
var minimum_protocol_version = 1
@@ -105,22 +100,52 @@ class OpenwireProtocolHandler extends Pr
var passcode: Option[AsciiBuffer] = None
var dead = false
val security_context = new SecurityContext
+ var config:OpenwireDTO = _
var heart_beat_monitor: HeartBeatMonitor = new HeartBeatMonitor
var waiting_on: String = "client request"
var current_command: Object = _
+ var codec:OpenwireCodec = _
override def create_connection_status = {
var rc = new OpenwireConnectionStatusDTO
rc.protocol_version = ""+(if (wire_format == null) 0 else wire_format.getVersion)
rc.user = login.map(_.toString).getOrElse(null)
- // rc.subscription_count = consumers.size
+ rc.subscription_count = all_consumers.size
rc.waiting_on = waiting_on
rc
}
+ override def set_connection(connection: BrokerConnection) = {
+ super.set_connection(connection)
+ import collection.JavaConversions._
+
+ codec = connection.transport.getProtocolCodec.asInstanceOf[OpenwireCodec]
+ config = connection.connector.config.protocols.find( _.isInstanceOf[OpenwireDTO]).map(_.asInstanceOf[OpenwireDTO]).getOrElse(new OpenwireDTO)
+
+// protocol_filters = ProtocolFilter.create_filters(config.protocol_filters.toList, this)
+//
+ import OptionSupport._
+
+// config.max_data_length.foreach( codec.max_data_length = _ )
+// config.max_header_length.foreach( codec.max_header_length = _ )
+// config.max_headers.foreach( codec.max_headers = _ )
+
+ if( config.destination_separator!=null ||
+ config.path_separator!= null ||
+ config.any_child_wildcard != null ||
+ config.any_descendant_wildcard!= null ) {
+
+// destination_parser = new DestinationParser().copy(Stomp.destination_parser)
+// if( config.destination_separator!=null ) { destination_parser.destination_separator = config.destination_separator }
+// if( config.path_separator!=null ) { destination_parser.path_separator = config.path_separator }
+// if( config.any_child_wildcard!=null ) { destination_parser.any_child_wildcard = config.any_child_wildcard }
+// if( config.any_descendant_wildcard!=null ) { destination_parser.any_descendant_wildcard = config.any_descendant_wildcard }
+ }
+ }
+
def suspendRead(reason: String) = {
waiting_on = reason
connection.transport.suspendRead
@@ -139,7 +164,6 @@ class OpenwireProtocolHandler extends Pr
}
}
-
override def on_transport_failure(error: IOException) = {
if (!connection.stopped) {
error.printStackTrace
@@ -204,7 +228,6 @@ class OpenwireProtocolHandler extends Pr
}
}
-
override def on_transport_command(command: Object):Unit = {
if( dead ) {
// We stop processing client commands once we are dead
@@ -323,8 +346,6 @@ class OpenwireProtocolHandler extends Pr
throw new Break()
}
-
-
def on_wire_format_info(info: WireFormatInfo) = {
if (!info.isValid()) {
@@ -377,6 +398,232 @@ class OpenwireProtocolHandler extends Pr
connection_session.offer(brokerInfo);
}
+ ///////////////////////////////////////////////////////////////////
+ // Connection / Session / Consumer / Producer state tracking.
+ ///////////////////////////////////////////////////////////////////
+
+ def on_connection_info(info: ConnectionInfo) = {
+ val id = info.getConnectionId()
+ if (!all_connections.contains(id)) {
+ new ConnectionContext(info).attach
+
+ security_context.user = info.getUserName
+ security_context.password = info.getPassword
+
+ reset {
+ if( host.authenticator!=null && host.authorizer!=null ) {
+ suspendRead("authenticating and authorizing connect")
+ if( !host.authenticator.authenticate(security_context) ) {
+ async_die("Authentication failed.")
+ noop
+ } else if( !host.authorizer.can_connect_to(security_context, host, connection.connector) ) {
+ async_die("Connect not authorized.")
+ noop
+ } else {
+ resumeRead
+ ack(info);
+ noop
+ }
+ } else {
+ ack(info);
+ noop
+ }
+ }
+ } else {
+ ack(info);
+ }
+ }
+
+ def on_session_info(info: SessionInfo) = {
+ val id = info.getSessionId();
+ if (!all_sessions.contains(id)) {
+ val parent = all_connections.get(id.getParentId()).getOrElse(die("Cannot add a session to a connection that had not been registered."))
+ new SessionContext(parent, info).attach
+ }
+ ack(info);
+ }
+
+ def on_producer_info(info: ProducerInfo) = {
+ val id = info.getProducerId
+ if (!all_producers.contains(id)) {
+ val parent = all_sessions.get(id.getParentId()).getOrElse(die("Cannot add a producer to a session that had not been registered."))
+ new ProducerContext(parent, info).attach
+ }
+ ack(info);
+ }
+
+ def on_consumer_info(info: ConsumerInfo) = {
+ val id = info.getConsumerId
+ if (!all_consumers.contains(id)) {
+ val parent = all_sessions.get(id.getParentId()).getOrElse(die("Cannot add a consumer to a session that had not been registered."))
+ new ConsumerContext(parent, info).attach
+ } else {
+ ack(info);
+ }
+ }
+
+ def on_remove_info(info: RemoveInfo) = {
+ info.getObjectId match {
+ case id: ConnectionId => all_connections.get(id).foreach(_.dettach)
+ case id: SessionId => all_sessions.get(id).foreach(_.dettach)
+ case id: ProducerId => all_producers.get(id).foreach(_.dettach)
+ case id: ConsumerId => all_consumers.get(id).foreach(_.dettach )
+ // case id: DestinationInfo =>
+ case _ => die("Invalid object id.")
+ }
+ ack(info)
+ }
+
+ def on_transaction_info(info:TransactionInfo) = {
+ val parent = all_connections.get(info.getConnectionId()).getOrElse(die("Cannot add a session to a connection that had not been registered."))
+ val id = info.getTransactionId
+ info.getType match {
+ case TransactionInfo.BEGIN =>
+ get_or_create_tx_ctx(parent, id)
+ ack(info)
+
+ case TransactionInfo.COMMIT_ONE_PHASE =>
+ get_tx_ctx(id).commit {
+ ack(info)
+ }
+
+ case TransactionInfo.ROLLBACK =>
+ get_tx_ctx(id).rollback
+ ack(info)
+
+ case TransactionInfo.END =>
+ die("XA not yet supported")
+ case TransactionInfo.PREPARE =>
+ die("XA not yet supported")
+ case TransactionInfo.COMMIT_TWO_PHASE =>
+ die("XA not yet supported")
+ case TransactionInfo.RECOVER =>
+ die("XA not yet supported")
+ case TransactionInfo.FORGET =>
+ die("XA not yet supported")
+
+ case _ =>
+ fail("Transaction info type unknown: " + info.getType)
+
+ }
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ // Core message processing
+ ///////////////////////////////////////////////////////////////////
+
+ def on_message(msg: ActiveMQMessage) = {
+ val producer = all_producers.get(msg.getProducerId).getOrElse(die("Producer associated with the message has not been registered."))
+
+ if (msg.getOriginalDestination() == null) {
+ msg.setOriginalDestination(msg.getDestination());
+ }
+
+ if( msg.getTransactionId==null ) {
+ perform_send(msg)
+ } else {
+ get_or_create_tx_ctx(producer.parent.parent, msg.getTransactionId) { (uow)=>
+ perform_send(msg, uow)
+ }
+ }
+ }
+
+ def perform_send(msg:ActiveMQMessage, uow:StoreUOW=null): Unit = {
+
+ val destiantion = msg.getDestination.toDestination
+ val key = destiantion.toList
+ producerRoutes.get(key) match {
+ case null =>
+ // create the producer route...
+
+ val route = new DeliveryProducerRoute(host.router) {
+ override def connection = Some(OpenwireProtocolHandler.this.connection)
+ override def dispatch_queue = queue
+ refiller = ^ {
+ resumeRead
+ }
+ }
+
+ // don't process frames until producer is connected...
+ connection.transport.suspendRead
+ reset {
+ val rc = host.router.connect(destiantion, route, security_context)
+ rc match {
+ case Some(failure) =>
+ async_fail(failure, msg)
+ case None =>
+ if (!connection.stopped) {
+ resumeRead
+ producerRoutes.put(key, route)
+ send_via_route(route, msg, uow)
+ }
+ }
+ }
+
+ case route =>
+ // we can re-use the existing producer route
+ send_via_route(route, msg, uow)
+
+ }
+ }
+
+ def send_via_route(route:DeliveryProducerRoute, message:ActiveMQMessage, uow:StoreUOW) = {
+ if( !route.targets.isEmpty ) {
+
+ // We may need to add some headers..
+ val delivery = new Delivery
+ delivery.message = new OpenwireMessage(message)
+ delivery.size = message.getSize
+ delivery.uow = uow
+
+ if( message.isResponseRequired ) {
+ delivery.ack = { (consumed, uow) =>
+ dispatchQueue <<| ^{
+ ack(message)
+ }
+ }
+ }
+
+ // routes can always accept at least 1 delivery...
+ assert( !route.full )
+ route.offer(delivery)
+ if( route.full ) {
+ // but once it gets full.. suspend, so that we get more messages
+ // until it's not full anymore.
+ suspendRead("blocked destination: "+route.overflowSessions.mkString(", "))
+ }
+
+ } else {
+ // info("Dropping message. No consumers interested in message.")
+ ack(message)
+ }
+ // message.release
+ }
+
+ def on_message_ack(info:MessageAck) = {
+ val consumer = all_consumers.get(info.getConsumerId).getOrElse(die("Cannot ack a message on a consumer that had not been registered."))
+ info.getTransactionId match {
+ case null =>
+ consumer.ack_handler(info)
+ case txid =>
+ get_or_create_tx_ctx(consumer.parent.parent, txid){ (uow)=>
+ consumer.ack_handler(info, uow)
+ }
+ }
+ ack(info)
+ }
+
+ // public Response processAddDestination(DestinationInfo info) throws Exception {
+ // ActiveMQDestination destination = info.getDestination();
+ // if (destination.isTemporary()) {
+ // // Keep track of it so that we can remove them this connection
+ // // shuts down.
+ // temporaryDestinations.add(destination);
+ // }
+ // host.createQueue(destination);
+ // return ack(info);
+ // }
+
val all_connections = new HashMap[ConnectionId, ConnectionContext]();
val all_sessions = new HashMap[SessionId, SessionContext]();
val all_producers = new HashMap[ProducerId, ProducerContext]();
@@ -419,8 +666,6 @@ class OpenwireProtocolHandler extends Pr
parent.sessions.remove(info.getSessionId)
all_sessions.remove(info.getSessionId)
}
-
-
}
def noop = shift { k: (Unit=>Unit) => k() }
@@ -600,7 +845,6 @@ class OpenwireProtocolHandler extends Pr
queue {
consumer_acks += (( id, callback ))
}
-
}
def apply(messageAck: MessageAck, uow:StoreUOW=null) = {
@@ -628,9 +872,7 @@ class OpenwireProtocolHandler extends Pr
}
}
}
-
}
-
}
class TransactionContext(val parent: ConnectionContext, val id: TransactionId) {
@@ -727,214 +969,4 @@ class OpenwireProtocolHandler extends Pr
}
}
- ///////////////////////////////////////////////////////////////////
- // Connection / Session / Consumer / Producer state tracking.
- ///////////////////////////////////////////////////////////////////
-
- def on_connection_info(info: ConnectionInfo) = {
- val id = info.getConnectionId()
- if (!all_connections.contains(id)) {
- new ConnectionContext(info).attach
- }
- ack(info);
- }
-
- def on_session_info(info: SessionInfo) = {
- val id = info.getSessionId();
- if (!all_sessions.contains(id)) {
- val parent = all_connections.get(id.getParentId()).getOrElse(die("Cannot add a session to a connection that had not been registered."))
- new SessionContext(parent, info).attach
- }
- ack(info);
- }
-
- def on_producer_info(info: ProducerInfo) = {
- val id = info.getProducerId
- if (!all_producers.contains(id)) {
- val parent = all_sessions.get(id.getParentId()).getOrElse(die("Cannot add a producer to a session that had not been registered."))
- new ProducerContext(parent, info).attach
- }
- ack(info);
- }
-
- def on_consumer_info(info: ConsumerInfo) = {
- val id = info.getConsumerId
- if (!all_consumers.contains(id)) {
- val parent = all_sessions.get(id.getParentId()).getOrElse(die("Cannot add a consumer to a session that had not been registered."))
- new ConsumerContext(parent, info).attach
- } else {
- ack(info);
- }
- }
-
- def on_remove_info(info: RemoveInfo) = {
- info.getObjectId match {
- case id: ConnectionId => all_connections.get(id).foreach(_.dettach)
- case id: SessionId => all_sessions.get(id).foreach(_.dettach)
- case id: ProducerId => all_producers.get(id).foreach(_.dettach)
- case id: ConsumerId => all_consumers.get(id).foreach(_.dettach )
- // case id: DestinationInfo =>
- case _ => die("Invalid object id.")
- }
- ack(info)
- }
-
- ///////////////////////////////////////////////////////////////////
- // Methods for transaction management
- ///////////////////////////////////////////////////////////////////
- def on_transaction_info(info:TransactionInfo) = {
- val parent = all_connections.get(info.getConnectionId()).getOrElse(die("Cannot add a session to a connection that had not been registered."))
- val id = info.getTransactionId
- info.getType match {
- case TransactionInfo.BEGIN =>
- get_or_create_tx_ctx(parent, id)
- ack(info)
-
- case TransactionInfo.COMMIT_ONE_PHASE =>
- get_tx_ctx(id).commit {
- ack(info)
- }
-
- case TransactionInfo.ROLLBACK =>
- get_tx_ctx(id).rollback
- ack(info)
-
- case TransactionInfo.END =>
- die("XA not yet supported")
- case TransactionInfo.PREPARE =>
- die("XA not yet supported")
- case TransactionInfo.COMMIT_TWO_PHASE =>
- die("XA not yet supported")
- case TransactionInfo.RECOVER =>
- die("XA not yet supported")
- case TransactionInfo.FORGET =>
- die("XA not yet supported")
-
- case _ =>
- fail("Transaction info type unknown: " + info.getType)
-
- }
- }
-
- ///////////////////////////////////////////////////////////////////
- // Core message processing
- ///////////////////////////////////////////////////////////////////
-
- def on_message(msg: ActiveMQMessage) = {
- val producer = all_producers.get(msg.getProducerId).getOrElse(die("Producer associated with the message has not been registered."))
-
- if (msg.getOriginalDestination() == null) {
- msg.setOriginalDestination(msg.getDestination());
- }
-
- if( msg.getTransactionId==null ) {
- perform_send(msg)
- } else {
- get_or_create_tx_ctx(producer.parent.parent, msg.getTransactionId) { (uow)=>
- perform_send(msg, uow)
- }
- }
- }
-
- def perform_send(msg:ActiveMQMessage, uow:StoreUOW=null): Unit = {
-
- val destiantion = msg.getDestination.toDestination
- val key = destiantion.toList
- producerRoutes.get(key) match {
- case null =>
- // create the producer route...
-
- val route = new DeliveryProducerRoute(host.router) {
- override def connection = Some(OpenwireProtocolHandler.this.connection)
- override def dispatch_queue = queue
- refiller = ^ {
- resumeRead
- }
- }
-
- // don't process frames until producer is connected...
- connection.transport.suspendRead
- reset {
- val rc = host.router.connect(destiantion, route, security_context)
- rc match {
- case Some(failure) =>
- async_fail(failure, msg)
- case None =>
- if (!connection.stopped) {
- resumeRead
- producerRoutes.put(key, route)
- send_via_route(route, msg, uow)
- }
- }
- }
-
- case route =>
- // we can re-use the existing producer route
- send_via_route(route, msg, uow)
-
- }
- }
-
- def send_via_route(route:DeliveryProducerRoute, message:ActiveMQMessage, uow:StoreUOW) = {
- if( !route.targets.isEmpty ) {
-
- // We may need to add some headers..
- val delivery = new Delivery
- delivery.message = new OpenwireMessage(message)
- delivery.size = message.getSize
- delivery.uow = uow
-
- if( message.isResponseRequired ) {
- delivery.ack = { (consumed, uow) =>
- dispatchQueue <<| ^{
- ack(message)
- }
- }
- }
-
- // routes can always accept at least 1 delivery...
- assert( !route.full )
- route.offer(delivery)
- if( route.full ) {
- // but once it gets full.. suspend, so that we get more messages
- // until it's not full anymore.
- suspendRead("blocked destination: "+route.overflowSessions.mkString(", "))
- }
-
- } else {
- // info("Dropping message. No consumers interested in message.")
- ack(message)
- }
- // message.release
- }
-
-
- def on_message_ack(info:MessageAck) = {
- val consumer = all_consumers.get(info.getConsumerId).getOrElse(die("Cannot ack a message on a consumer that had not been registered."))
- info.getTransactionId match {
- case null =>
- consumer.ack_handler(info)
- case txid =>
- get_or_create_tx_ctx(consumer.parent.parent, txid){ (uow)=>
- consumer.ack_handler(info, uow)
- }
- }
- ack(info)
- }
-
- // public Response processAddDestination(DestinationInfo info) throws Exception {
- // ActiveMQDestination destination = info.getDestination();
- // if (destination.isTemporary()) {
- // // Keep track of it so that we can remove them this connection
- // // shuts down.
- // temporaryDestinations.add(destination);
- // }
- // host.createQueue(destination);
- // return ack(info);
- // }
-
-
- //
-
-
}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/dto/OpenwireDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/dto/OpenwireDTO.java?rev=1136199&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/dto/OpenwireDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/dto/OpenwireDTO.java Wed Jun 15 21:26:06 2011
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.dto;
+
+import org.apache.activemq.apollo.dto.ProtocolDTO;
+
+import javax.xml.bind.annotation.*;
+
+/**
+ * Allow you to customize the openwire protocol implementation.
+ */
+@XmlRootElement(name="stomp")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class OpenwireDTO extends ProtocolDTO {
+
+ @XmlAttribute(name="max_data_length")
+ public Integer max_data_length;
+
+ @XmlAttribute(name="destination_separator")
+ public String destination_separator;
+
+ @XmlAttribute(name="path_separator")
+ public String path_separator;
+
+ @XmlAttribute(name="any_child_wildcard")
+ public String any_child_wildcard;
+
+ @XmlAttribute(name="any_descendant_wildcard")
+ public String any_descendant_wildcard;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof OpenwireDTO)) return false;
+
+ OpenwireDTO openwireDTO = (OpenwireDTO) o;
+
+ if (any_child_wildcard != null ? !any_child_wildcard.equals(openwireDTO.any_child_wildcard) : openwireDTO.any_child_wildcard != null)
+ return false;
+ if (any_descendant_wildcard != null ? !any_descendant_wildcard.equals(openwireDTO.any_descendant_wildcard) : openwireDTO.any_descendant_wildcard != null)
+ return false;
+ if (destination_separator != null ? !destination_separator.equals(openwireDTO.destination_separator) : openwireDTO.destination_separator != null)
+ return false;
+ if (max_data_length != null ? !max_data_length.equals(openwireDTO.max_data_length) : openwireDTO.max_data_length != null)
+ return false;
+ if (path_separator != null ? !path_separator.equals(openwireDTO.path_separator) : openwireDTO.path_separator != null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (max_data_length != null ? max_data_length.hashCode() : 0);
+ result = 31 * result + (destination_separator != null ? destination_separator.hashCode() : 0);
+ result = 31 * result + (path_separator != null ? path_separator.hashCode() : 0);
+ result = 31 * result + (any_child_wildcard != null ? any_child_wildcard.hashCode() : 0);
+ result = 31 * result + (any_descendant_wildcard != null ? any_descendant_wildcard.hashCode() : 0);
+ return result;
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-bdb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-bdb.xml?rev=1136199&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-bdb.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-bdb.xml Wed Jun 15 21:26:06 2011
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+ <notes>Has a BDB store enabled.</notes>
+
+ <virtual_host id="default" purge_on_startup="true">
+ <host_name>localhost</host_name>
+
+ <queue name="unified.**" unified="true"/>
+
+ <bdb_store directory="${basedir}/target/test-data"/>
+ </virtual_host>
+
+ <web_admin bind="http://127.0.0.1:0"/>
+ <connector id="tcp" protocol="openwire" bind="tcp://0.0.0.0:0"/>
+
+</broker>
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-custom-dest-delimiters.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-custom-dest-delimiters.xml?rev=1136199&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-custom-dest-delimiters.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-custom-dest-delimiters.xml Wed Jun 15 21:26:06 2011
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+ <notes>This broker configuration is what the unit tests in this module load up.</notes>
+
+ <virtual_host id="default" purge_on_startup="true" auto_create_queues="true">
+ <host_name>localhost</host_name>
+
+ <queue name="unified.**" unified="true"/>
+
+ </virtual_host>
+
+ <connector id="tcp" protocol="openwire" bind="tcp://0.0.0.0:0">
+ <stomp path_separator="/"/>
+ </connector>
+
+</broker>
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-secure.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-secure.xml?rev=1136199&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-secure.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-secure.xml Wed Jun 15 21:26:06 2011
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+
+ <authentication domain="OpenwireSecurityTest"/>
+
+ <virtual_host id="default" purge_on_startup="true">
+ <host_name>localhost</host_name>
+
+ <acl>
+ <connect allow="connect_group"/>
+ </acl>
+
+ <!-- queue security -->
+ <queue name="**" kind="ptp">
+ <acl>
+ <create allow="can_send_create_queue"/>
+ <send allow="can_send_create_queue"/>
+ <send allow="can_send_queue"/>
+ <receive allow="can_receive_queue"/>
+ <consume allow="can_consume_queue"/>
+ </acl>
+ </queue>
+
+ <!-- topic security -->
+ <destination name="**">
+ <acl>
+ <create allow="can_send_create_topic"/>
+ <send allow="can_send_create_topic"/>
+ <send allow="can_send_topic"/>
+ <receive allow="can_recieve_topic"/>
+ </acl>
+ </destination>
+
+ <!-- durable sub security -->
+ <queue name="**" kind="ds">
+ <acl>
+ <create allow="can_consume_create_ds"/>
+ <consume allow="can_consume_create_ds"/>
+ <consume allow="can_consume_ds"/>
+ </acl>
+ </queue>
+ </virtual_host>
+
+ <connector id="tcp" protocol="openwire" bind="tcp://0.0.0.0:0">
+ <stomp add_user_header="JMSXUserID"/>
+ </connector>
+
+</broker>
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl-secure.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl-secure.xml?rev=1136199&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl-secure.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl-secure.xml Wed Jun 15 21:26:06 2011
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+
+
+ <authentication domain="OpenwireSslSecurityTest"/>
+
+ <virtual_host id="default" purge_on_startup="true">
+ <host_name>localhost</host_name>
+
+ <acl>
+ <connect allow="connect_group"/>
+ </acl>
+
+ </virtual_host>
+
+ <key_storage file="${basedir}/src/test/resources/apollo.ks" password="password" key_password="password"/>
+ <connector id="ssl" protocol="openwire" bind="ssl://0.0.0.0:0" />
+
+</broker>
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl.xml?rev=1136199&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl.xml Wed Jun 15 21:26:06 2011
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+
+ <notes>The config for the ssl stomp tests.</notes>
+ <virtual_host id="default" purge_on_startup="true" auto_create_queues="true">
+ <host_name>localhost</host_name>
+ </virtual_host>
+
+ <key_storage file="${basedir}/src/test/resources/apollo.ks" password="password" key_password="password"/>
+ <connector id="ssl" protocol="openwire" bind="ssl://0.0.0.0:0" />
+
+</broker>
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo.ks
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo.ks?rev=1136199&view=auto
==============================================================================
Binary file - no diff available.
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo.ks
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/client.ks
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/client.ks?rev=1136199&view=auto
==============================================================================
Binary file - no diff available.
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/client.ks
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties?rev=1136199&r1=1136198&r2=1136199&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties Wed Jun 15 21:26:06 2011
@@ -19,22 +19,23 @@
# Setup the default logging levels
#
log4j.rootLogger=WARN, console, logfile
-log4j.logger.org.apache.activemq.apollo=INFO
+#log4j.logger.org.apache.activemq.apollo=INFO
#
# Uncomment one of the following to enable debug logging
#
-# log4j.logger.org.apache.activemq.apollo=DEBUG
-# log4j.logger.org.apache.activemq.apollo.broker=DEBUG
-# log4j.logger.org.apache.activemq.apollo.web=DEBUG
-# log4j.logger.org.apache.activemq.apollo.cli=DEBUG
-# log4j.logger.org.apache.activemq.apollo.broker.store.hawtdb=DEBUG
+log4j.logger.org.apache.activemq.apollo=TRACE
+log4j.logger.org.apache.activemq.apollo.openwire=TRACE
+log4j.logger.org.apache.activemq.apollo.broker=TRACE
+log4j.logger.org.apache.activemq.apollo.web=TRACE
+log4j.logger.org.apache.activemq.apollo.cli=TRACE
+log4j.logger.org.apache.activemq.apollo.broker.store.hawtdb=TRACE
# Console Settings
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%-5p | %m%n
-log4j.appender.console.threshold=INFO
+log4j.appender.console.threshold=TRACE
# File Settings
log4j.appender.logfile=org.apache.log4j.RollingFileAppender
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/login.config
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/login.config?rev=1136199&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/login.config (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/login.config Wed Jun 15 21:26:06 2011
@@ -0,0 +1,39 @@
+// ---------------------------------------------------------------------------
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ---------------------------------------------------------------------------
+OpenwireSecurityTest {
+
+ org.apache.activemq.apollo.broker.security.FileUserLoginModule optional
+ file="users.properties";
+
+ //
+ // For testing purposes, we do a funny thing where we set the user
+ // file to also be used as the groups file. This only works for the
+ // test since user==password==group for our tests.
+ //
+ org.apache.activemq.apollo.broker.security.FileGroupLoginModule optional
+ file="users.properties";
+
+};
+
+OpenwireSslSecurityTest {
+ org.apache.activemq.apollo.broker.security.CertificateLoginModule optional;
+
+ org.apache.activemq.apollo.broker.security.FileGroupLoginModule optional
+ match="javax.security.auth.x500.X500Principal"
+ file="users.properties";
+
+};
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/users.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/users.properties?rev=1136199&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/users.properties (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/users.properties Wed Jun 15 21:26:06 2011
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+connect_group=CN=ssl_user|can_only_connect|can_send_create_queue|can_send_queue|can_receive_queue|can_consume_queue|can_send_create_topic|can_send_topic|can_recieve_topic|can_consume_create_ds|can_consume_ds
+
+can_not_connect=can_not_connect
+can_only_connect=can_only_connect
+
+#
+# Users with specific roles related to queues
+#
+can_send_create_queue=can_send_create_queue
+can_send_queue=can_send_queue
+can_receive_queue=can_receive_queue
+can_consume_queue=can_consume_queue
+
+#
+# Users with specific roles related to topics
+#
+can_send_create_topic=can_send_create_topic
+can_send_topic=can_send_topic
+can_recieve_topic=can_recieve_topic
+can_consume_create_ds=can_consume_create_ds
+can_consume_ds=can_consume_ds
+
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala?rev=1136199&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala Wed Jun 15 21:26:06 2011
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire
+
+import javax.jms.JMSException
+
+class SecurityTest extends OpenwireTestSupport {
+
+ override val broker_config_uri: String = "xml:classpath:apollo-openwire-secure.xml"
+
+ override protected def beforeAll = {
+ try {
+ val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())
+ System.setProperty("java.security.auth.login.config", login_file.getCanonicalPath)
+ } catch {
+ case x:Throwable => x.printStackTrace
+ }
+ super.beforeAll
+ }
+}
+
+class ConnectionFailureWithValidCredentials extends SecurityTest {
+
+ test("Connect with valid id password but can't connect") {
+
+ val factory = create_connection_factory
+ val connection = factory.createConnection("can_not_connect", "can_not_connect")
+
+ intercept[JMSException] {
+ connection.start()
+ }
+ }
+}
+
+class CoonectionFailsWhenNoCredentialsGiven extends SecurityTest {
+
+ test("Connect with no id password") {
+
+ val factory = create_connection_factory
+ val connection = factory.createConnection()
+
+ intercept[JMSException] {
+ connection.start()
+ }
+ }
+}
+
+class ConnectionFailsWhenCredentialsAreInvlaid extends SecurityTest {
+
+ test("Connect with invalid id password") {
+ val factory = create_connection_factory
+ val connection = factory.createConnection("foo", "bar")
+
+ intercept[JMSException] {
+ connection.start()
+ }
+ }
+}
\ No newline at end of file