You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/27 01:12:17 UTC

svn commit: r1390772 - in /activemq/activemq-apollo/trunk: apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/protocol/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-openwire/src/test/scala/org/apache/activemq/a...

Author: chirino
Date: Wed Sep 26 23:12:17 2012
New Revision: 1390772

URL: http://svn.apache.org/viewvc?rev=1390772&view=rev
Log:
Implementing APLO-262 : Begin documenting OpenWire features

This is Christian Posta's patch with slight modifications.  Many thanks!

Added:
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/protocol/
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactoryTest.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/openwire-features.md
Modified:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireParallelTest.scala

Added: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactoryTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactoryTest.scala?rev=1390772&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactoryTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactoryTest.scala Wed Sep 26 23:12:17 2012
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.protocol
+
+import org.apache.activemq.apollo.util.FunSuiteSupport
+import org.scalatest.matchers.ShouldMatchers
+
+/**
+ * Verify the protocols in the apollo-broker module can be found.
+ *
+ * @author <a href="http://www.christianposta.com/blog">Christian Posta</a>
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class ProtocolFactoryTest extends FunSuiteSupport with ShouldMatchers{
+  test("ProtocolFactory tests"){
+    var proto = ProtocolFactory.get("udp")
+    proto should not be (None)
+
+    proto = ProtocolFactory.get("ssl")
+    proto should not be (None)
+
+    proto = ProtocolFactory.get("any")
+    proto should not be (None)
+
+    // The stomp and openwire protocols will not be available yet
+    // since their impl are not on the classpath yet.
+  }
+}

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1390772&r1=1390771&r2=1390772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed Sep 26 23:12:17 2012
@@ -156,7 +156,6 @@ class OpenwireProtocolHandler extends Pr
     preferred_wireformat_settings.setCacheEnabled(config.cache.getOrElse(DEFAULT_WIREFORMAT_SETTINGS.isCacheEnabled))
     preferred_wireformat_settings.setVersion(config.version.getOrElse(DEFAULT_WIREFORMAT_SETTINGS.getVersion))
     preferred_wireformat_settings.setStackTraceEnabled(config.stack_trace.getOrElse(DEFAULT_WIREFORMAT_SETTINGS.isStackTraceEnabled))
-    preferred_wireformat_settings.setCacheEnabled(config.cache.getOrElse(DEFAULT_WIREFORMAT_SETTINGS.isCacheEnabled))
     preferred_wireformat_settings.setTightEncodingEnabled(config.tight_encoding.getOrElse(DEFAULT_WIREFORMAT_SETTINGS.isTightEncodingEnabled))
     preferred_wireformat_settings.setMaxInactivityDuration(config.max_inactivity_duration.getOrElse(DEFAULT_WIREFORMAT_SETTINGS.getMaxInactivityDuration))
     preferred_wireformat_settings.setMaxInactivityDurationInitalDelay(config.max_inactivity_duration_initial_delay.getOrElse(DEFAULT_WIREFORMAT_SETTINGS.getMaxInactivityDurationInitalDelay))

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireParallelTest.scala?rev=1390772&r1=1390771&r2=1390772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireParallelTest.scala Wed Sep 26 23:12:17 2012
@@ -96,6 +96,40 @@ class OpenwireParallelTest extends Openw
     get(-1)
   }
 
+  test("Wildcard subscription recursive"){
+    connect()
+
+    val common_prefix = next_id() + path_separator
+
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+
+    val producer1_dest = common_prefix + "A"
+    val producer1 = session.createProducer(queue(producer1_dest))
+
+    val producer2_dest = producer1_dest + path_separator + "bar"
+    val producer2 = session.createProducer(queue(producer2_dest))
+
+
+    def put(producer: MessageProducer, id: Int) {
+      producer.send(session.createTextMessage("message:" + id))
+    }
+
+    val subscribe_dest = common_prefix + "A" + path_separator + ">"
+    val consumer = session.createConsumer(queue(subscribe_dest))
+
+    def get(id: Int) {
+      receive_text(consumer) should equal("message:" + id)
+    }
+
+    // put messages onto the queues and consume them
+    List(1, 2, 3).foreach(put(producer1, _))
+    List(1, 2, 3).foreach(get _)
+
+    List(4, 5, 6).foreach(put(producer2, _))
+    List(4, 5, 6).foreach(get _)
+  }
+
   test("Wildcard subscription with multiple path sections") {
     connect()
 

Added: activemq/activemq-apollo/trunk/apollo-website/src/documentation/openwire-features.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/openwire-features.md?rev=1390772&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/openwire-features.md (added)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/openwire-features.md Wed Sep 26 23:12:17 2012
@@ -0,0 +1,284 @@
+## Using the OpenWire Protocol
+
+{:toc:2-5}
+
+### OpenWire
+
+Clients can connect to ${project_name} using the
+[OpenWire](http://activemq.apache.org/openwire.html) protocol. OpenWire is a binary,
+on-the-wire protocol used natively by [ActiveMQ](http://activemq.apache.org/). It was designed
+to be a fast, full-featured, and JMS-compliant protocol for message brokers. Currently there are
+native client libraries for Java, C, C#, and C++. Further OpenWire support can be built by
+implementing language-specific code generators, however, for most cross-langauge needs, the
+[STOMP](http://stomp.github.com) protocol is best.
+
+
+OpenWire was designed to be extended but yet backward compatible with older versions. When a client connects
+to the broker, the protocol version that's used is negotiated based on what each can support.
+
+### OpenWire Protocol Options
+
+You can use the `openwire` configuration element within the `connector` element
+in the `apollo.xml` configuration file to change the default settings used
+in the OpenWire protocol implementation.
+
+{pygmentize:: xml}
+<connector id="tcp" bind="tcp://0.0.0.0:61613">
+  <openwire attribute="value"/>
+</connector>
+{pygmentize}
+
+The `openwire` element supports the following configuration attributes:
+
+* `buffer_size` : How much each producer or subscription will buffer between
+   the client and the broker. Defaults to `640k`.
+* `stack_trace` : If there is an exception on the broker, it will be sent back to the client. Default is `true`
+* `cache` : Used to reduce marshalling efforts within the broker. Cache data structures such as openwire commands,
+ destination objects, subscription info, etc. Default is `true`
+* `cache_size` : Number of internal data structures to cache. Default is `1024`
+* `tight_endcoding` : Optimize the encoding to be effecient over the wire at the expense of greater CPU usage to
+marshal/unmarshal. Default is `true`
+* `tcp_no_delay` : Decide whether to use [Nagle's Algorithm](http://en.wikipedia.org/wiki/Nagle's_algorithm) which improves TCP/IP effeciency for small packets.
+Set to true to disable this algorithm. Default is `false` (which means nodelay is off, and it uses Nagle's algorithm)
+* `max_inactivity_duration` : Max inactivity period, in milliseconds, at which point the socket would be considered
+dead. Used by the heartbeat functionality. If there is a period of inactivity greater than this period, the socket will
+be closed. Default is `30000`
+* `max_inactivity_duration_initial_delay` : Amount of time to delay between determining the socket should be closed
+and actually closing it. Default is `30000`
+* `max_frame_size` : Size in bytes of the largest frame that can be sent to the broker. Default is `100MB`
+
+An example of configuring the OpenWire protocol
+
+{pygmentize:: xml}
+<connector id="tcp" bind="tcp://0.0.0.0:61613">
+  <openwire tight_encoding="false" tcp_no_delay="true"/>
+</connector>
+{pygmentize}
+
+### Protocol Detection (different that open-wire vesion detection)
+Apollo was designed to be inherently multi-protocol. Although STOMP was the first protocol to be implemented in Apollo, the core
+of the broker was not built around STOMP or any other specific protocol. Apollo, in fact by default, has the ability
+to detect the protocol being used on the wire without further configuration. This makes the configuration easier on
+the broker, and means you only need to open one connector that can handle multiple different types of wire protocols.
+If you would like to specify a certain connector for OpenWire and another connector for a different protocol, you can
+explicitly configure the connector to be an OpenWire connector:
+
+{pygmentize:: xml}
+<connector protocol="openwire" ... />
+{pygmentize}
+
+You can also support a limited subset of protocols:
+
+{pygmentize:: xml}
+<connector bind="...">
+    <detect protocols="openwire stomp" />
+</connector>
+{pygmentize}
+
+Or you can leave it open to any of the supported protocols (default), and the correct protocol will be used depending
+on what the client is using. You do this by not specifying any protocol settings.
+
+
+Note, this type of on-the-wire protocol detection is different that the OpenWire version detection briefly mentioned
+above. After the broker determines a client is using an OpenWire protocol, the version is negotiated separately from
+how the broker determines a protocol.
+
+### Client Libraries
+To connect to Apollo using the OpenWire protocol, we recommend you use the latest [ActiveMQ](http://activemq.apache.org/) 5.x client libraries.
+
+* [C](http://activemq.apache.org/c-integration.html)
+* [C++](http://activemq.apache.org/activemq-c-clients.html)
+* [C# and .NET](http://activemq.apache.org/nms/)
+
+To configure specific behaviors for your connection, see the [Connection reference](http://activemq.apache.org/connection-configuration-uri.html)
+for ActiveMQ 5.x
+
+### Broker features available using the OpenWire protocol
+
+####Destination Types
+
+* Queues (for point-to-point messaging) - A JMS Queue implements load balancer semantics. A single message will be
+received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept
+until a consumer is available that can process the message. If a consumer receives a message and does not acknowledge
+it before closing then the message will be redelivered to another consumer. A queue can have many consumers with
+messages load balanced across the available consumers.
+
+* Topics (publish-subscribe) - In JMS a Topic implements publish and subscribe semantics. When you publish a message it
+goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only
+subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.
+
+* Durable Subscriptions (persistent publish-subscribe) - Durable subscriptions allow you to achieve semantics similar to
+a queue using topics. Specifically, this allows a subscription to subscribe and disconnect without worrying about losing messages.
+If the client disconnects, the messages that arrive at the topic while the subscription is inactive will be queued up
+for consumption when the subscription becomes reactivated.
+
+####Wildcard Subscriptions
+Wild cards can be used in destination names when subscribing as a consumer. This allows you to subscribe
+to multiple destinations or hierarchy of destinations.
+
+* `.` is used to separate names in a path
+* `*` is used to match any name in a path
+* `>` is used to recursively match path names
+
+Unlike some of the other protocols ${project_name} supports, for the OpenWire implementation, regex wildcards
+are not supported. Also note that for other protocols, the wildcard for recursive destinations is indeed ">" and not
+"**".
+
+#### Composite Destinations
+You can send to multiple destinations with one single operation. When you create a destination to which your producer
+will be sending, you can specify multiple destinations with the "," (comma) destination separator. For example,
+if you want to send a single message to two queues:
+
+    Destination destination = session.createQueue("test-queue,test-queue-foo")
+    MessageProducer producer = session.createProducer(destination);
+    TextMessage message = session.createTextMessage("Message #" + i);
+    producer.send(message);
+
+Note both destinations named will be considered queues. However, you can also include a topic destination in your
+list. You'll want to use the `topic://` prefix if mixing destination types (or `queue://` for queues):
+
+    Destination destination = session.createQueue("test-queue,test-queue-foo,topic://test-topic-foo")
+
+
+Similarly you can consume from multiple destinations as well. When you set up your consumer's destination, just follow
+the same rules as above.
+
+
+#### Exclusive Consumer
+To do exclusive consumer on a queue, you will specify the settings on the queue itself:
+
+    "QUEUE.NAME?consumer.exclusive=true"
+
+The first consumer to subscribe to the queue will be the exclusive consumer. Any other consumers that
+subscribe to the queue will not receive messages as long as the exclusive consumer is alive and consuming. If the
+exclusive consumer goes away, the next in line to subscribe will be selected as the exclusive consumer. In general,
+the order that's calculcated for who should be the next exclusive consumer is based on when they subscribe. The first
+to subscribe wins and the others fall in line based on when they subscribed.
+
+#### Temporary Destinations
+Temporary destinations are bound to the connection that created them; therefore, when the connection goes away, the
+temporary destination will also go away. Using temporary is one way to implement a request-reply messaging pattern
+with ${project_name}. The steps for using temporary queues or topics for request-reply are as follows:
+
+Create a temporary destination
+
+    Destination replyDest = session.createTemporaryQueue();
+
+Create a consumer for that destination
+
+    MessageConsumer replyConsumer = session.createConsumer(replyDest);
+
+Create a message to send as a request and set the JMSReplyTo header to the temp destination
+
+    message.setJMSReplyTo(replyDest);
+
+Send the message. If the receiver of the message is aware that it's participating in a request-reply scenario, it
+should place the response into the destination specified in the JMSReplyTo header.
+
+
+#### Message Selectors
+You can use message selectors to create subscriptions to destinations that are filtered based on some headers or
+properties in the message. You define a selector as a String that is similar to the SQL92 syntax.
+
+For example, to define a consumer on a destination that is only interested in messages that have a property named "intended" and
+a value of "me", pass a selector as the second argument to the [session.createConsumer()](http://docs.oracle.com/javaee/6/api/javax/jms/Session.html) method:
+
+    session.createConsumer(destination, "intended = 'me'");
+
+Now messages produced with a property/value combination specified in the selector will be delivered to the consumer.
+
+Here's an example of producing the message:
+
+{pygmentize:: java}
+MessageProducer producer = session.createProducer(destination);
+
+for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
+    TextMessage message = session.createTextMessage("Message #" + i);
+    LOG.info("Sending message #" + i);
+    producer.send(message);
+    Thread.sleep(DELAY);
+}
+{pygmentize}
+
+####Browing Subscription
+With a [QueueBrowser](http://activemq.apache.org/maven/5.6.0/activemq-core/apidocs/org/apache/activemq/ActiveMQQueueBrowser.html), you can
+browse a queue's messages without actually consuming them. This can be useful for debugging, adding a user-interface layer,
+or audit or logging.
+
+To establish a browsing subscription to a queue, use the JMS API:
+
+    QueueBrowser browser = session.createBrowser((Queue) destination);
+
+Then you can enumerate the messages and examine them with the following idiom:
+
+    Enumeration enumeration = browser.getEnumeration();
+
+    while (enumeration.hasMoreElements()) {
+        TextMessage message = (TextMessage) enumeration.nextElement();
+        System.out.println("Browsing: " + message);
+    }
+
+
+When you browse a queue, only a snapshot of the queue will be available. If more messages are enqueued, the browsing
+session will not automatically see those.
+
+Note, you cannot establish browsing sessions to a durable topic with OpenWire/JMS.
+
+
+#### Transactions
+Transactions can be done on both the consumer and the producer for any destination. When you create a session,
+pass `true` to the first parameter:
+
+    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+You can `commit` or `rollback` a transaction by calling `session.commit()` or `session.rollback()` respectively.
+On the broker side, each command that you take before calling `session.commit()` (like sending a message) gets batched
+up in a TransactionContext. When commit is made, all of the commands are executed and a Response is sent to the client
+(i.e., calling commit is a synchronous call. Before calling commit, all other commands are asyc).
+
+
+
+### OpenWire protocol details
+This section explains a little more about what's happening on the wire. The STOMP protocol, as it was designed,
+is easy to understand and monitor since it's a text-based protocol. OpenWire, however, is binary,
+and understanding the interactions that happen isn't as easy. Some clues might be helpful.
+
+All OpenWire commands are implemented as "command" objects following the Gang of Four [Command Pattern](http://en.wikipedia.org/wiki/Command_pattern).
+The structure of the objects are described [at the ActiveMQ website](http://activemq.apache.org/openwire-version-2-specification.html), but
+what about the interactions?
+
+
+Establishing a connection to the broker:
+A connection is established between the client and the broker with the client creating a new ActiveMQConnection
+(most likely using a connection factory of some sort). When a new "connection" is created, the underlying transport
+mechanisms send a WireFormatInfo command to the broker. This command describes what version and configurations of the OpenWire protocol
+the client wishes to use. For example, some of the configuration options are the ones listed above that can also be
+configured on the broker.
+
+When the TCP connection is handled on the broker side, it sends a WireFormatInfo to the client. The purpose of exchanging
+these WireFormatInfo commands is to be able to negotiate what settings to use as each the client and the server has
+their own preferred settings. The lowest protocol version between the two is used. When the broker receives the client's
+WireFormatInfo command, it negotiates the differences on its side and then sends a BrokerInfo command. Conversely
+on the client, when it receives the broker's WireFormatInfo, it negotiates it and sends a ConnectionInfo command. When
+the broker receives a ConnectionInfo command, it will either ack it with a Response command, or use security settings established globally
+for the broker or for a given virtual host to determine whether connections are allowed. If a connection is not allowed
+to the broker or to to virtual host, the broker will kill the connection.
+
+### OpenWire features to be documented
+
+* Flow Control
+* Persistent Messaging
+* Message Expiration
+
+### OpenWire implementation features to come in the near future:
+
+* [Message Groups using JMSXGroupID](http://activemq.apache.org/message-groups.html)
+* [Subscription recovery/retroactive consumer](http://activemq.apache.org/retroactive-consumer.html)
+* [Network of brokers](http://activemq.apache.org/networks-of-brokers.html)
+* [Shared-state Master/Slave](http://activemq.apache.org/shared-file-system-master-slave.html)
+* [XA transaction]()
+* [Exclusive Consumer with Priority](http://activemq.apache.org/exclusive-consumer.html)
+* [Startup Destinations](http://activemq.apache.org/configure-startup-destinations.html)
+* [Delete inactive dests](http://activemq.apache.org/delete-inactive-destinations.html)
+* [Virtual Dests](http://activemq.apache.org/virtual-destinations.html)
+* [JMX](http://activemq.apache.org/jmx.html)