You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/06/20 20:57:03 UTC
svn commit: r1352265 [2/2] - in /activemq/activemq-apollo/trunk: ./
apollo-amqp/ apollo-amqp/src/ apollo-amqp/src/main/
apollo-amqp/src/main/resources/ apollo-amqp/src/main/resources/META-INF/
apollo-amqp/src/main/resources/META-INF/services/ apollo-am...
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/log4j.properties?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/log4j.properties (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/log4j.properties Wed Jun 20 18:57:01 2012
@@ -0,0 +1,45 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# Setup the default logging levels
+#
+log4j.rootLogger=INFO, console, logfile
+#log4j.logger.org.apache.activemq.apollo=INFO
+
+#
+# Uncomment one of the following to enable debug logging
+#
+# log4j.logger.org.apache.activemq.apollo=DEBUG
+# log4j.logger.org.apache.activemq.apollo.broker=DEBUG
+# log4j.logger.org.apache.activemq.apollo.web=DEBUG
+# log4j.logger.org.apache.activemq.apollo.cli=DEBUG
+
+# Console Settings
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p | %m%n
+log4j.appender.console.threshold=TRACE
+
+# File Settings
+log4j.appender.logfile=org.apache.log4j.RollingFileAppender
+log4j.appender.logfile.file=apollo.log
+log4j.appender.logfile.maxFileSize=5MB
+log4j.appender.logfile.maxBackupIndex=5
+log4j.appender.logfile.append=true
+log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
+log4j.appender.logfile.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/login.config
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/login.config?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/login.config (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/login.config Wed Jun 20 18:57:01 2012
@@ -0,0 +1,41 @@
+// ---------------------------------------------------------------------------
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ---------------------------------------------------------------------------
+AmqpSecurityTest {
+
+ org.apache.activemq.apollo.broker.security.SocketAddressLoginModule requisite;
+
+ org.apache.activemq.apollo.broker.security.FileUserLoginModule optional
+ file="users.properties";
+
+ //
+ // For testing purposes, we do a funny thing where we set the user
+ // file to also be used as the groups file. This only works for the
+ // test since user==password==group for our tests.
+ //
+ org.apache.activemq.apollo.broker.security.FileGroupLoginModule optional
+ file="users.properties";
+
+};
+
+AmqpSslSecurityTest {
+ org.apache.activemq.apollo.broker.security.CertificateLoginModule optional;
+
+ org.apache.activemq.apollo.broker.security.FileGroupLoginModule optional
+ match="javax.security.auth.x500.X500Principal"
+ file="users.properties";
+
+};
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/org/apache/activemq/apollo/amqp/dto/simple.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/org/apache/activemq/apollo/amqp/dto/simple.xml?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/org/apache/activemq/apollo/amqp/dto/simple.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/org/apache/activemq/apollo/amqp/dto/simple.xml Wed Jun 20 18:57:01 2012
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+ <virtual_host id="vh-local">
+ </virtual_host>
+
+ <connector bind="tcp://0.0.0.0:61616" id="port-61616">
+ <amqp add_user_header="JMSXUserID">
+ <add_user_header>GroupId</add_user_header>
+ <add_user_header kind="UserPrincipal">UserId</add_user_header>
+ </amqp>
+ </connector>
+</broker>
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/users.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/users.properties?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/users.properties (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/users.properties Wed Jun 20 18:57:01 2012
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+connect_group=CN=ssl_user|can_only_connect|can_send_create_queue|can_send_queue|can_receive_queue|can_consume_queue|can_send_create_topic|can_send_topic|can_recieve_topic|can_consume_create_ds|can_consume_ds|can_send_create_consume_queue
+
+guest=guest
+can_not_connect=can_not_connect
+can_only_connect=can_only_connect
+connector_restricted=connector_restricted
+
+#
+# Users with specific roles related to queues
+#
+can_send_create_queue=can_send_create_queue
+can_send_queue=can_send_queue
+can_receive_queue=can_receive_queue
+can_consume_queue=can_consume_queue
+can_send_create_consume_queue=can_send_create_consume_queue
+
+#
+# Users with specific roles related to topics
+#
+can_send_create_topic=can_send_create_topic
+can_send_topic=can_send_topic
+can_recieve_topic=can_recieve_topic
+can_consume_create_ds=can_consume_create_ds
+can_consume_ds=can_consume_ds
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/AmqpTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/AmqpTest.scala?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/AmqpTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/AmqpTest.scala Wed Jun 20 18:57:01 2012
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp
+
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.BeforeAndAfterEach
+import org.apache.activemq.apollo.broker._
+import java.io.FileInputStream
+import org.apache.activemq.apollo.util.FileSupport._
+import org.fusesource.amqp.codec.AMQPProtocolCodec
+import com.swiftmq.amqp.v100.client.ExceptionListener
+import java.lang.Exception
+
+class AmqpTestSupport extends BrokerFunSuiteSupport with ShouldMatchers with BeforeAndAfterEach {
+
+ override def broker_config_uri = "xml:classpath:apollo-amqp.xml"
+
+// var client = new AmqpClient
+// var clients = List[AmqpClient]()
+//
+// override protected def afterEach() = {
+// super.afterEach
+// clients.foreach(_.close)
+// clients = Nil
+// }
+//
+// def connect_request(version:String, c: AmqpClient, headers:String="", connector:String=null) = {
+// val p = connector_port(connector).getOrElse(port)
+// c.open("localhost", p)
+// version match {
+// case "1.0"=>
+// c.write(
+// "CONNECT\n" +
+// headers +
+// "\n")
+// case "1.1"=>
+// c.write(
+// "CONNECT\n" +
+// "accept-version:1.1\n" +
+// "host:localhost\n" +
+// headers +
+// "\n")
+// case x=> throw new RuntimeException("invalid version: %f".format(x))
+// }
+// clients ::= c
+// c.receive()
+// }
+//
+// def connect(version:String, c: AmqpClient = client, headers:String="", connector:String=null) = {
+// val frame = connect_request(version, c, headers, connector)
+// frame should startWith("CONNECTED\n")
+// frame should include regex("""session:.+?\n""")
+// frame should include("version:"+version+"\n")
+// c
+// }
+//
+// val receipt_counter = new AtomicLong()
+//
+// def sync_send(dest:String, body:Any, headers:String="", c:AmqpClient = client) = {
+// val rid = receipt_counter.incrementAndGet()
+// c.write(
+// "SEND\n" +
+// "destination:"+dest+"\n" +
+// "receipt:"+rid+"\n" +
+// headers+
+// "\n" +
+// body)
+// wait_for_receipt(""+rid, c)
+// }
+//
+// def async_send(dest:String, body:Any, headers:String="", c: AmqpClient = client) = {
+// c.write(
+// "SEND\n" +
+// "destination:"+dest+"\n" +
+// headers+
+// "\n" +
+// body)
+// }
+//
+// def subscribe(id:String, dest:String, mode:String="auto", persistent:Boolean=false, headers:String="", sync:Boolean=true, c: AmqpClient = client) = {
+// val rid = receipt_counter.incrementAndGet()
+// c.write(
+// "SUBSCRIBE\n" +
+// "destination:"+dest+"\n" +
+// "id:"+id+"\n" +
+// (if(persistent) "persistent:true\n" else "")+
+// "ack:"+mode+"\n"+
+// (if(sync) "receipt:"+rid+"\n" else "") +
+// headers+
+// "\n")
+// if(sync) {
+// wait_for_receipt(""+rid, c)
+// }
+// }
+//
+// def unsubscribe(id:String, headers:String="", c: AmqpClient=client) = {
+// val rid = receipt_counter.incrementAndGet()
+// client.write(
+// "UNSUBSCRIBE\n" +
+// "id:"+id+"\n" +
+// "receipt:"+rid+"\n" +
+// headers+
+// "\n")
+// wait_for_receipt(""+rid, c)
+// }
+//
+// def assert_received(body:Any, sub:String=null, c: AmqpClient=client):(Boolean)=>Unit = {
+// val frame = c.receive()
+// frame should startWith("MESSAGE\n")
+// if(sub!=null) {
+// frame should include ("subscription:"+sub+"\n")
+// }
+// body match {
+// case null =>
+// case body:scala.util.matching.Regex => frame should endWith regex(body)
+// case body => frame should endWith("\n\n"+body)
+// }
+// // return a func that can ack the message.
+// (ack:Boolean)=> {
+// val sub_regex = """(?s).*\nsubscription:([^\n]+)\n.*""".r
+// val msgid_regex = """(?s).*\nmessage-id:([^\n]+)\n.*""".r
+// val sub_regex(sub) = frame
+// val msgid_regex(msgid) = frame
+// c.write(
+// (if(ack) "ACK\n" else "NACK\n") +
+// "subscription:"+sub+"\n" +
+// "message-id:"+msgid+"\n" +
+// "\n")
+// }
+// }
+//
+// def wait_for_receipt(id:String, c: AmqpClient = client, discard_others:Boolean=false): Unit = {
+// if( !discard_others ) {
+// val frame = c.receive()
+// frame should startWith("RECEIPT\n")
+// frame should include("receipt-id:"+id+"\n")
+// } else {
+// while(true) {
+// val frame = c.receive()
+// if( frame.startsWith("RECEIPT\n") && frame.indexOf("receipt-id:"+id+"\n")>=0 ) {
+// return
+// }
+// }
+// }
+// }
+}
+
+import com.swiftmq.amqp.AMQPContext
+import com.swiftmq.amqp.v100.client.Connection
+import com.swiftmq.amqp.v100.client.QoS
+import com.swiftmq.amqp.v100.generated.messaging.message_format.AmqpValue
+import com.swiftmq.amqp.v100.types.AMQPString
+
+object PrintAMQPStream {
+ def main(args: Array[String]) {
+ for( arg <- args ) {
+ println("--------------------------------------------------------")
+ println(" File: "+arg)
+ println("--------------------------------------------------------")
+ using(new FileInputStream(arg)) { is =>
+ val codec = new AMQPProtocolCodec
+ codec.setReadableByteChannel(is.getChannel)
+ // codec.skipProtocolHeader()
+
+ var pos = 0L
+ var frame = codec.read()
+ var counter = 0
+ while( frame !=null ) {
+ var next_pos = codec.getReadCounter - codec.getReadBytesPendingDecode
+ counter += 1
+ println("@"+pos+" "+frame)
+ pos = next_pos;
+ frame = try {
+ codec.read()
+ } catch {
+ case e:java.io.EOFException => null
+ }
+ }
+ }
+ }
+ }
+}
+
+class AmqpTest extends AmqpTestSupport {
+
+ test("broker") {
+
+// val port = 5672
+// val queue = "testqueue"
+
+ val queue = "/queue/testqueue"
+
+ val nMsgs = 1
+ val qos = QoS.AT_MOST_ONCE
+ val ctx = new AMQPContext(AMQPContext.CLIENT);
+
+ try {
+
+ val connection = new Connection(ctx, "127.0.0.1", port, false)
+ connection.setContainerId("client")
+ connection.setIdleTimeout(-1)
+ connection.setMaxFrameSize(1024*4)
+ connection.setExceptionListener(new ExceptionListener(){
+ def onException(e: Exception) {
+ e.printStackTrace();
+ }
+ })
+ connection.connect;
+ {
+ var data = "x" * 10 // 1024*20
+
+ var session = connection.createSession(10, 10)
+ var p = {
+ session.createProducer(queue, qos)
+ }
+ for (i <- 0 until nMsgs) {
+ var msg = new com.swiftmq.amqp.v100.messaging.AMQPMessage
+ var s = "Message #" + (i + 1)
+ println("Sending " + s)
+ msg.setAmqpValue(new AmqpValue(new AMQPString(s+", data: "+data)))
+ p.send(msg)
+ }
+ p.close()
+ session.close()
+ }
+ {
+ var session = connection.createSession(10, 10)
+ val c = session.createConsumer(queue, 100, qos, true, null);
+
+ // Receive messages non-transacted
+ for (i <- 0 until nMsgs)
+ {
+ val msg = c.receive();
+ if (msg == null)
+
+ msg.getAmqpValue().getValue match {
+ case value:AMQPString =>
+ println("Received: " + value.getValue());
+ }
+ if (!msg.isSettled())
+ msg.accept();
+ }
+ c.close()
+ session.close()
+ }
+ connection.close()
+ } catch {
+ case e: Exception => {
+ e.printStackTrace
+ }
+ }
+
+ }
+}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java Wed Jun 20 18:57:01 2012
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.dto;
+
+import org.apache.activemq.apollo.dto.*;
+import org.junit.Test;
+
+import java.io.InputStream;
+import java.util.List;
+
+import static junit.framework.Assert.*;
+import static junit.framework.Assert.assertEquals;
+
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+
+public class XmlCodecTest {
+
+ private InputStream resource(String path) {
+ return getClass().getResourceAsStream(path);
+ }
+
+ @Test
+ public void unmarshalling() throws Exception {
+ BrokerDTO dto = XmlCodec.decode(BrokerDTO.class, resource("simple.xml"));
+ assertNotNull(dto);
+
+ assertEquals(1, dto.connectors.size());
+ AcceptingConnectorDTO connector = (AcceptingConnectorDTO)dto.connectors.get(0);
+ assertEquals(1, connector.protocols.size());
+ ProtocolDTO amqp = connector.protocols.get(0);
+ assertTrue(amqp instanceof AmqpDTO);
+ assertEquals("JMSXUserID", ((AmqpDTO) amqp).add_user_header);
+
+ List<AddUserHeaderDTO> add_user_headers = ((AmqpDTO) amqp).add_user_headers;
+ assertEquals(2, add_user_headers.size());
+ assertEquals("GroupId", add_user_headers.get(0).name);
+ assertEquals("UserId", add_user_headers.get(1).name);
+ assertEquals("UserPrincipal", add_user_headers.get(1).kind);
+
+ }
+
+
+}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1352265&r1=1352264&r2=1352265&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jun 20 18:57:01 2012
@@ -90,7 +90,7 @@ class VirtualHost(val broker: Broker, va
var config:VirtualHostDTO = _
val router:Router = new LocalRouter(this)
- var names:List[String] = Nil;
+ def names:List[String] = config.host_names.toList;
var store:Store = null
val queue_id_counter = new LongCounter()
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala?rev=1352265&r1=1352264&r2=1352265&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala Wed Jun 20 18:57:01 2012
@@ -183,7 +183,7 @@ class AnyProtocolHandler extends Protoco
connection.transport.suspendRead
protocol_handler.set_connection(connection);
- protocol_handler.on_transport_connected
+ connection.transport.getTransportListener.onTransportConnected()
}
override def on_transport_connected = {
Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1352265&r1=1352264&r2=1352265&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Wed Jun 20 18:57:01 2012
@@ -176,6 +176,7 @@
<module>apollo-network</module>
<module>apollo-openwire-generator</module>
<module>apollo-openwire</module>
+ <module>apollo-amqp</module>
<module>apollo-distro</module>
</modules>