You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/03/27 21:20:32 UTC

svn commit: r1461839 [2/2] - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/...

Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java?rev=1461839&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java Wed Mar 27 20:20:31 2013
@@ -0,0 +1,1084 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt;
+
+import org.apache.activemq.apollo.broker.*;
+import org.apache.activemq.apollo.broker.protocol.RawMessage;
+import org.apache.activemq.apollo.broker.protocol.RawMessageCodec$;
+import org.apache.activemq.apollo.broker.security.SecurityContext;
+import org.apache.activemq.apollo.broker.store.StoreUOW;
+import org.apache.activemq.apollo.filter.FilterException;
+import org.apache.activemq.apollo.mqtt.MqttSessionManager.HostState;
+import org.apache.activemq.apollo.mqtt.MqttSessionManager.SessionState;
+import org.apache.activemq.apollo.util.LRUCache;
+import org.apache.activemq.apollo.util.LongCounter;
+import org.apache.activemq.apollo.util.path.Path$;
+import org.apache.activemq.apollo.util.path.PathMap;
+import org.apache.activemq.apollo.util.path.PathParser;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.hawtdispatch.*;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.fusesource.mqtt.codec.*;
+import scala.Option;
+import scala.Tuple2;
+import scala.runtime.BoxedUnit;
+
+import java.net.ProtocolException;
+import java.util.*;
+
+import static org.fusesource.hawtdispatch.Dispatch.NOOP;
+import static org.fusesource.hawtdispatch.Dispatch.createQueue;
+
+/**
+ * An MqttSession can be switch from one connection/protocol handler to another,
+ * but it will only be associated with one at a time. An MqttSession tracks
+ * the state of the communication with a client.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MqttSession {
+
+    public static final ScalaSupport.Logger log = new ScalaSupport.Logger(MqttProtocolHandler$.MODULE$);
+
+    public static <T> T received(T value) {
+        log.trace("received: %s", value);
+        return value;
+    }
+
+
+    public final HostState host_state;
+    public final UTF8Buffer client_id;
+    public final SessionState session_state;
+    public final DispatchQueue queue;
+
+    public MqttSession(HostState host_state, UTF8Buffer client_id, SessionState session_state) {
+        this.host_state = host_state;
+        this.client_id = client_id;
+        this.queue = createQueue("mqtt: " + client_id);
+        this.session_state = session_state;
+    }
+
+    public boolean manager_disconnected = false;
+    public MqttProtocolHandler handler;
+    public SecurityContext security_context;
+    public boolean clean_session = false;
+    public CONNECT connect_message;
+    public DestinationParser destination_parser = MqttProtocol.destination_parser;
+    boolean publish_body = false;
+
+    public VirtualHost host() {
+        return host_state.host();
+    }
+
+    public void connect(final MqttProtocolHandler next) {
+        queue.execute(new Task() {
+            public void run() {
+                if (manager_disconnected) {
+                    // we are not the assignment anymore.. go to the session manager
+                    // again to setup a new session.
+                    MqttSessionManager.attach(host(), client_id, next);
+                } else {
+
+                    // so that we don't switch again until this current switch completes
+                    queue.suspend();
+                    if (handler != null) {
+                        detach();
+                        handler = null;
+                    }
+                    queue.execute(new Task() {
+                        public void run() {
+                            handler = next;
+                            attach();
+                        }
+                    });
+
+                    // switch the connection to the session queue..
+                    next.connection()._set_dispatch_queue(queue, new Task() {
+                        public void run() {
+                            queue.resume();
+                        }
+                    });
+                }
+
+            }
+        });
+    }
+
+    public void disconnect(final MqttProtocolHandler prev) {
+        queue.execute(new Task() {
+            @Override
+            public void run() {
+                if (handler == prev) {
+                    MqttSessionManager.remove(host_state, client_id);
+                    manager_disconnected = true;
+                    detach();
+                    handler = null;
+                }
+            }
+        });
+    }
+
+    /////////////////////////////////////////////////////////////////////
+    //
+    // Bits that deal with connections attaching/detaching from the session
+    //
+    /////////////////////////////////////////////////////////////////////
+    public void attach() {
+        queue.assertExecuting();
+        final MqttProtocolHandler h = handler;
+        clean_session = h.connect_message().cleanSession();
+        security_context = h.security_context();
+        h.command_handler_$eq(ScalaSupport.toScala(new UnitFn1<Object>() {
+            @Override
+            public void call(Object v1) {
+                on_transport_command(v1);
+            }
+        }));
+
+        destination_parser = h.destination_parser();
+        mqtt_consumer().consumer_sink.downstream_$eq(ScalaSupport.some(h.sink_manager().open()));
+
+        final Task ack_connect = new Task() {
+            @Override
+            public void run() {
+                queue.assertExecuting();
+                connect_message = h.connect_message();
+                CONNACK connack = new CONNACK();
+                connack.code(CONNACK.Code.CONNECTION_ACCEPTED);
+                send(connack);
+            }
+        };
+
+        if (!clean_session) {
+            // Setup the previous subscriptions..
+            session_state.strategy().create(host().store(), client_id);
+            if (!session_state.subscriptions().isEmpty()) {
+                h._suspend_read("subscribing");
+                ArrayList<Topic> topics = ScalaSupport.map(session_state.subscriptions().values(), new Fn1<Tuple2<Topic, BindAddress>, Topic>() {
+                    @Override
+                    public Topic apply(Tuple2<Topic, BindAddress> v1) {
+                        return v1._1();
+                    }
+                });
+                subscribe(topics, new Task() {
+                    @Override
+                    public void run() {
+                        h.resume_read();
+                        h.queue().execute(ack_connect);
+                    }
+                });
+            } else {
+                ack_connect.run();
+            }
+        } else {
+            // do we need to clear the received ids?
+            // durable_session_state.received_message_ids.clear()
+            session_state.subscriptions().clear();
+            if (session_state.durable_sub() != null) {
+                final DestinationAddress[] addresses = new DestinationAddress[]{session_state.durable_sub()};
+                session_state.durable_sub_$eq(null);
+                host().dispatch_queue().execute(new Task() {
+                    @Override
+                    public void run() {
+                        host().router().delete(addresses, security_context);
+                    }
+                });
+            }
+
+            session_state.strategy().destroy(new Task() {
+                @Override
+                public void run() {
+                    ack_connect.run();
+                }
+            });
+        }
+
+    }
+
+    public void detach() {
+        queue.assertExecuting();
+        if (!producerRoutes.isEmpty()) {
+            final ArrayList<MqttProducerRoute> routes = new ArrayList<MqttProducerRoute>(producerRoutes.values());
+            host().dispatch_queue().execute(new Task() {
+                @Override
+                public void run() {
+                    for (MqttProducerRoute route : routes) {
+                        host().router().disconnect(new ConnectAddress[]{route.address}, route);
+                    }
+                }
+            });
+            producerRoutes.clear();
+        }
+
+        if (clean_session) {
+            if (!mqtt_consumer().addresses.isEmpty()) {
+                final BindAddress[] addresses = mqtt_consumer().addresses.keySet().toArray(new BindAddress[mqtt_consumer().addresses.size()]);
+                host().dispatch_queue().execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        host().router().unbind(addresses, mqtt_consumer(), false, security_context);
+                    }
+                });
+                mqtt_consumer().addresses.clear();
+            }
+            session_state.subscriptions().clear();
+        } else {
+            if (session_state.durable_sub() != null) {
+                final BindAddress[] addresses = new BindAddress[]{session_state.durable_sub()};
+                host().dispatch_queue().execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        host().router().unbind(addresses, mqtt_consumer(), false, security_context);
+                    }
+                });
+                mqtt_consumer().addresses.clear();
+                session_state.durable_sub_$eq(null);
+            }
+        }
+
+        for (Request request : in_flight_publishes.values()) {
+            if (request.ack() != null) {
+                request.ack().apply(
+                        request.delivered() ? Delivered$.MODULE$ : Undelivered$.MODULE$
+                );
+            }
+        }
+        in_flight_publishes.clear();
+
+        handler.sink_manager().close(mqtt_consumer().consumer_sink.downstream().get(), ScalaSupport.<Request>noopFn1());
+        mqtt_consumer().consumer_sink.downstream_$eq(ScalaSupport.<Sink<Request>>none());
+        handler.on_transport_disconnected();
+    }
+
+    public SimpleAddress decode_destination(UTF8Buffer value) {
+
+        SimpleAddress rc = destination_parser.decode_single_destination(value.toString(), ScalaSupport.toScala(new Fn1<String, SimpleAddress>() {
+            public SimpleAddress apply(String name) {
+                return new SimpleAddress("topic", destination_parser.decode_path(name));
+            }
+        }));
+        if (rc == null && handler != null) {
+            handler.die("Invalid mqtt destination name: " + value);
+        }
+        return rc;
+    }
+
+    /////////////////////////////////////////////////////////////////////
+    //
+    // Bits that deal with assigning message ids to QoS > 0 requests
+    // and tracking those requests so that they can get replayed on a
+    // reconnect.
+    //
+    /////////////////////////////////////////////////////////////////////
+
+    final HashMap<Short, Request> in_flight_publishes = new HashMap<Short, Request>();
+
+    public void send(MessageSupport.Message message) {
+        queue.assertExecuting();
+        handler.connection_sink().offer(new Request((short) 0, message, null));
+    }
+
+    public void publish_completed(short id) {
+        queue.assertExecuting();
+        Request request = in_flight_publishes.remove(id);
+        if (request != null) {
+            if (request.ack() != null) {
+                request.ack().apply(Consumed$.MODULE$);
+            }
+        } else {
+            // It's possible that on a reconnect, we get an ACK
+            // in for message that was not dispatched yet. store
+            // a place holder so we ack it upon the dispatch
+            // attempt.
+            in_flight_publishes.put(id, new Request(id, null, null));
+        }
+    }
+
+    /////////////////////////////////////////////////////////////////////
+    //
+    // Bits that deal with processing new messages from the client.
+    //
+    /////////////////////////////////////////////////////////////////////
+    public void on_transport_command(Object o) {
+        try {
+            if (o.getClass() == MQTTFrame.class) {
+                MQTTFrame command = (MQTTFrame) o;
+                switch (command.messageType()) {
+                    case PUBLISH.TYPE: {
+                        on_mqtt_publish(received(new PUBLISH().decode(command)));
+                        break;
+                    }
+                    // This follows a Publish with QoS EXACTLY_ONCE
+                    case PUBREL.TYPE: {
+                        final PUBREL ack = received(new PUBREL().decode(command));
+                        // TODO: perhaps persist the processed list.. otherwise
+                        // we can't filter out dups after a broker restart.
+                        session_state.received_message_ids().remove(ack.messageId());
+                        session_state.strategy().update(new Task() {
+                            @Override
+                            public void run() {
+                                send(new PUBCOMP().messageId(ack.messageId()));
+                            }
+                        });
+                        break;
+                    }
+                    case SUBSCRIBE.TYPE: {
+                        on_mqtt_subscribe(received(new SUBSCRIBE().decode(command)));
+                        break;
+                    }
+                    case UNSUBSCRIBE.TYPE: {
+                        on_mqtt_unsubscribe(received(new UNSUBSCRIBE().decode(command)));
+                        break;
+                    }
+                    // AT_LEAST_ONCE ack flow for a client subscription
+                    case PUBACK.TYPE: {
+                        PUBACK ack = received(new PUBACK().decode(command));
+                        publish_completed(ack.messageId());
+                        break;
+                    }
+                    // EXACTLY_ONCE ack flow for a client subscription
+                    case PUBREC.TYPE: {
+                        PUBREC ack = received(new PUBREC().decode(command));
+                        send(new PUBREL().messageId(ack.messageId()));
+                        break;
+                    }
+                    case PUBCOMP.TYPE: {
+                        PUBCOMP ack = received(new PUBCOMP().decode(command));
+                        publish_completed(ack.messageId());
+                        break;
+                    }
+                    case PINGREQ.TYPE: {
+                        received(new PINGREQ().decode(command));
+                        send(new PINGRESP());
+                        break;
+                    }
+                    case DISCONNECT.TYPE: {
+                        received(new DISCONNECT());
+                        MqttSessionManager.disconnect(host_state, client_id, handler);
+                        break;
+                    }
+                    default: {
+                        handler.die("Invalid MQTT message type: " + command.messageType());
+                        break;
+                    }
+                }
+            } else if ("failure".equals(o)) {
+                // Publish the client's will
+                publish_will(new Task() {
+                    @Override
+                    public void run() {
+                        // then disconnect him.
+                        MqttSessionManager.disconnect(host_state, client_id, handler);
+
+                    }
+                });
+            } else {
+                handler.die("Internal Server Error: unexpected mqtt command: " + o.getClass());
+            }
+        } catch (ProtocolException e) {
+            handler.die("Internal Server Error: " + e);
+        }
+    }
+
+    /////////////////////////////////////////////////////////////////////
+    //
+    // Bits that deal with processing PUBLISH messages
+    //
+    /////////////////////////////////////////////////////////////////////
+    LRUCache<UTF8Buffer, MqttProducerRoute> producerRoutes = new LRUCache<UTF8Buffer, MqttProducerRoute>(10) {
+        @Override
+        protected void onCacheEviction(final Map.Entry<UTF8Buffer, MqttProducerRoute> eldest) {
+            host().dispatch_queue().execute(new Task() {
+                @Override
+                public void run() {
+                    ConnectAddress[] array = new ConnectAddress[]{eldest.getValue().address};
+                    host().router().disconnect(array, eldest.getValue());
+                }
+            });
+        }
+    };
+
+    class MqttProducerRoute extends DeliveryProducerRoute {
+
+        public final SimpleAddress address;
+        public final MqttProtocolHandler handler;
+        boolean suspended = false;
+
+        public MqttProducerRoute(SimpleAddress address, MqttProtocolHandler h) {
+            super(host().router());
+            this.address = address;
+            this.handler = h;
+            this.refiller_$eq(new Task() {
+                @Override
+                public void run() {
+                    if (suspended) {
+                        suspended = false;
+                        handler.resume_read();
+                    }
+                }
+            });
+        }
+
+        @Override
+        public int send_buffer_size() {
+            return handler.codec().getReadBufferSize();
+        }
+
+        @Override
+        public Option<BrokerConnection> connection() {
+            return ScalaSupport.some(handler.connection());
+        }
+
+        @Override
+        public DispatchQueue dispatch_queue() {
+            return queue;
+        }
+    }
+
+    public void on_mqtt_publish(final PUBLISH publish) {
+
+        if ((publish.qos() == QoS.EXACTLY_ONCE) && session_state.received_message_ids().contains(publish.messageId())) {
+            PUBREC response = new PUBREC();
+            response.messageId(publish.messageId());
+            send(response);
+            return;
+        }
+
+        handler.messages_received().incrementAndGet();
+
+        queue.assertExecuting();
+        MqttProducerRoute route = producerRoutes.get(publish.topicName());
+        if (route == null) {
+            // create the producer route...
+
+            final SimpleAddress destination = decode_destination(publish.topicName());
+            final MqttProducerRoute froute = route = new MqttProducerRoute(destination, handler);
+
+            // don't process commands until producer is connected...
+            route.handler._suspend_read("route publish lookup");
+            host().dispatch_queue().execute(new Task() {
+
+                @Override
+                public void run() {
+                    host().router().connect(new ConnectAddress[]{destination}, froute, security_context);
+                    queue.execute(new Runnable() {
+                        @Override
+                        public void run() {
+                            // We don't care if we are not allowed to send..
+                            if (!froute.handler.connection().stopped()) {
+                                froute.handler.resume_read();
+                                producerRoutes.put(publish.topicName(), froute);
+                                send_via_route(froute, publish);
+                            }
+                        }
+                    });
+                }
+            });
+        } else {
+            // we can re-use the existing producer route
+            send_via_route(route, publish);
+        }
+    }
+
+    class AtLeastOnceProducerAck extends UnitFn2<DeliveryResult, StoreUOW> {
+        public final PUBLISH publish;
+
+        AtLeastOnceProducerAck(PUBLISH publish) {
+            this.publish = publish;
+        }
+
+        public void call(final DeliveryResult r, final StoreUOW uow) {
+            queue.execute(new Task() {
+                @Override
+                public void run() {
+                    PUBACK response = new PUBACK();
+                    response.messageId(publish.messageId());
+                    send(response);
+                }
+            });
+        }
+    }
+
+    class ExactlyOnceProducerAck extends AtLeastOnceProducerAck {
+
+        ExactlyOnceProducerAck(PUBLISH publish) {
+            super(publish);
+        }
+
+        public void call(final DeliveryResult r, final StoreUOW uow) {
+            queue.execute(new Task() {
+                @Override
+                public void run() {
+                    // TODO: perhaps persist the processed list..
+                    session_state.received_message_ids().add(publish.messageId());
+                    session_state.strategy().update(new Task() {
+                        @Override
+                        public void run() {
+                            PUBREC response = new PUBREC();
+                            response.messageId(publish.messageId());
+                            send(response);
+                        }
+                    });
+                }
+            });
+        }
+    }
+
+    public void send_via_route(MqttProducerRoute route, PUBLISH publish) {
+        queue.assertExecuting();
+
+        AtLeastOnceProducerAck ack = null;
+        if (publish.qos() == QoS.AT_LEAST_ONCE) {
+            ack = new AtLeastOnceProducerAck(publish);
+        } else if (publish.qos() == QoS.EXACTLY_ONCE) {
+            ack = new ExactlyOnceProducerAck(publish);
+        }
+
+        if (!route.targets().isEmpty()) {
+            Delivery delivery = new Delivery();
+            delivery.message_$eq(new RawMessage(publish.payload()));
+            delivery.persistent_$eq(publish.qos().ordinal() > 0);
+            delivery.size_$eq(publish.payload().length);
+            delivery.ack_$eq(ScalaSupport.toScala(ack));
+            if (publish.retain()) {
+                if (delivery.size() == 0) {
+                    delivery.retain_$eq(RetainRemove$.MODULE$);
+                } else {
+                    delivery.retain_$eq(RetainSet$.MODULE$);
+                }
+            }
+
+            // routes can always accept at least 1 delivery...
+            assert !route.full();
+            route.offer(delivery);
+            if (route.full()) {
+                // but once it gets full.. suspend to flow control the producer.
+                route.suspended = true;
+                handler._suspend_read("blocked sending to: " + route.overflowSessions().mkString(", "));
+            }
+
+        } else {
+            if (ack != null) {
+                ack.apply(null, null);
+            }
+        }
+    }
+
+    public void publish_will(final Task complete_close) {
+        if (connect_message != null) {
+            if (connect_message.willTopic() == null) {
+                complete_close.run();
+            } else {
+
+                final SimpleAddress destination = decode_destination(connect_message.willTopic());
+                final DeliveryProducerRoute prodcuer = new DeliveryProducerRoute(host().router()) {
+                    {
+                        refiller_$eq(NOOP);
+                    }
+
+                    @Override
+                    public int send_buffer_size() {
+                        return 1024 * 64;
+                    }
+
+                    @Override
+                    public Option<BrokerConnection> connection() {
+                        return handler != null ? ScalaSupport.some(handler.connection()) : ScalaSupport.<BrokerConnection>none();
+                    }
+
+                    @Override
+                    public DispatchQueue dispatch_queue() {
+                        return queue;
+                    }
+                };
+
+                host().dispatch_queue().execute(new Task() {
+                    @Override
+                    public void run() {
+                        host().router().connect(new ConnectAddress[]{destination}, prodcuer, security_context);
+                        queue.execute(new Task() {
+                            @Override
+                            public void run() {
+                                if (prodcuer.targets().isEmpty()) {
+                                    complete_close.run();
+                                } else {
+                                    Delivery delivery = new Delivery();
+                                    delivery.message_$eq(new RawMessage(connect_message.willMessage()));
+                                    delivery.size_$eq(connect_message.willMessage().length);
+                                    delivery.persistent_$eq(connect_message.willQos().ordinal() > 0);
+                                    if (connect_message.willRetain()) {
+                                        if (delivery.size() == 0) {
+                                            delivery.retain_$eq(RetainRemove$.MODULE$);
+                                        } else {
+                                            delivery.retain_$eq(RetainSet$.MODULE$);
+                                        }
+                                    }
+                                    delivery.ack_$eq(ScalaSupport.toScala(new UnitFn2<DeliveryResult, StoreUOW>() {
+                                        @Override
+                                        public void call(DeliveryResult x, StoreUOW y) {
+                                            host().dispatch_queue().execute(new Task() {
+                                                @Override
+                                                public void run() {
+                                                    host().router().disconnect(new ConnectAddress[]{destination}, prodcuer);
+                                                }
+                                            });
+                                            complete_close.run();
+                                        }
+                                    }));
+                                    handler.messages_received().incrementAndGet();
+                                    prodcuer.offer(delivery);
+                                }
+                            }
+                        });
+                    }
+                });
+            }
+        }
+    }
+    /////////////////////////////////////////////////////////////////////
+    //
+    // Bits that deal with subscriptions
+    //
+    /////////////////////////////////////////////////////////////////////
+
+    public void on_mqtt_subscribe(final SUBSCRIBE sub) {
+        subscribe(Arrays.asList(sub.topics()), new Task() {
+            @Override
+            public void run() {
+                queue.execute(new Task() {
+                    @Override
+                    public void run() {
+                        session_state.strategy().update(new Task() {
+                            @Override
+                            public void run() {
+                                SUBACK suback = new SUBACK();
+                                suback.messageId(sub.messageId());
+
+                                byte[] granted = new byte[sub.topics().length];
+                                int i = 0;
+                                for (Topic topic : sub.topics()) {
+                                    granted[i] = (byte) topic.qos().ordinal();
+                                    i++;
+                                }
+
+                                suback.grantedQos(granted);
+                                send(suback);
+                            }
+                        });
+                    }
+                });
+            }
+        });
+    }
+
+    public void subscribe(Collection<Topic> topics, final Task on_subscribed) {
+        final ArrayList<BindAddress> addresses = ScalaSupport.map(topics, new Fn1<Topic, BindAddress>() {
+            @Override
+            public BindAddress apply(Topic topic) {
+                BindAddress address = decode_destination(topic.name());
+                session_state.subscriptions().put(topic.name(), new Tuple2<Topic, BindAddress>(topic, address));
+                mqtt_consumer().addresses.put(address, topic.qos());
+                if (PathParser.containsWildCards(address.path())) {
+                    mqtt_consumer().wildcards.put(address.path(), topic.qos());
+                }
+                return address;
+            }
+        });
+
+
+        handler.subscription_count_$eq(mqtt_consumer().addresses.size());
+
+        if (!clean_session) {
+            Set<BindAddress> bindAddressSet = mqtt_consumer().addresses.keySet();
+            SubscriptionAddress durable_sub = new SubscriptionAddress(Path$.MODULE$.create(client_id.toString()), null, bindAddressSet.toArray(new BindAddress[bindAddressSet.size()]));
+            session_state.durable_sub_$eq(durable_sub);
+            addresses.clear();
+            addresses.add(durable_sub);
+        }
+
+        host().dispatch_queue().execute(new Task() {
+            @Override
+            public void run() {
+                for (BindAddress address : addresses) {
+                    host().router().bind(new BindAddress[]{address}, mqtt_consumer(), security_context, ScalaSupport.<Option<String>>noopFn1());
+                }
+                on_subscribed.run();
+            }
+        });
+    }
+
+    public void on_mqtt_unsubscribe(final UNSUBSCRIBE unsubscribe) {
+
+        ArrayList<BindAddress> addressesList = ScalaSupport.flatMap(Arrays.asList(unsubscribe.topics()), new Fn1<UTF8Buffer, Option<BindAddress>>() {
+            @Override
+            public Option<BindAddress> apply(UTF8Buffer topicName) {
+                Tuple2<Topic, BindAddress> removed = session_state.subscriptions().remove(topicName);
+                if (removed != null) {
+                    Topic topic = removed._1();
+                    BindAddress address = removed._2();
+                    mqtt_consumer().addresses.remove(address);
+                    if (PathParser.containsWildCards(address.path())) {
+                        mqtt_consumer().wildcards.remove(address.path(), topic.qos());
+                    }
+                    return ScalaSupport.some(address);
+                } else {
+                    return ScalaSupport.none();
+                }
+
+            }
+        });
+        final BindAddress[] addresses = addressesList.toArray(new BindAddress[addressesList.size()]);
+
+        handler.subscription_count_$eq(mqtt_consumer().addresses.size());
+
+        if (!clean_session) {
+            Set<BindAddress> bindAddressSet = mqtt_consumer().addresses.keySet();
+            session_state.durable_sub_$eq(new SubscriptionAddress(Path$.MODULE$.create(client_id.toString()), null, bindAddressSet.toArray(new BindAddress[bindAddressSet.size()])));
+        }
+
+        host().dispatch_queue().execute(new Task() {
+            @Override
+            public void run() {
+                if (clean_session) {
+                    host().router().unbind(addresses, mqtt_consumer(), false, security_context);
+                } else {
+                    if (mqtt_consumer().addresses.isEmpty()) {
+                        host().router().unbind(new BindAddress[]{session_state.durable_sub()}, mqtt_consumer(), true, security_context);
+                        session_state.durable_sub_$eq(null);
+                    } else {
+                        host().router().bind(new BindAddress[]{session_state.durable_sub()}, mqtt_consumer(), security_context, ScalaSupport.<Option<String>>noopFn1());
+                    }
+                }
+                queue.execute(new Task() {
+                    @Override
+                    public void run() {
+                        session_state.strategy().update(new Task() {
+                            @Override
+                            public void run() {
+                                UNSUBACK ack = new UNSUBACK();
+                                ack.messageId(unsubscribe.messageId());
+                                send(ack);
+                            }
+                        });
+                    }
+                });
+            }
+        });
+
+    }
+
+
+    MqttConsumer _mqtt_consumer;
+
+    MqttConsumer mqtt_consumer() {
+        if (_mqtt_consumer == null) {
+            _mqtt_consumer = new MqttConsumer();
+        }
+        return _mqtt_consumer;
+    }
+
+    class IntPair {
+        int _1;
+        int _2;
+
+        IntPair(int int1, int int2) {
+            this._1 = int1;
+            this._2 = int2;
+        }
+    }
+
+    class MqttConsumer extends AbstractRetainedDeliveryConsumer {
+
+        @Override
+        public String toString() {
+            return "mqtt client:" + client_id + " remote address: " + security_context.remote_address();
+        }
+
+        public DispatchQueue dispatch_queue() {
+            return queue;
+        }
+
+        public HashMap<BindAddress, QoS> addresses = new HashMap<BindAddress, QoS>();
+        public PathMap wildcards = new PathMap<QoS>();
+
+        CustomDispatchSource<IntPair, IntPair> credit_window_source = Dispatch.createSource(new EventAggregator<IntPair, IntPair>() {
+            public IntPair mergeEvent(IntPair previous, IntPair event) {
+                if (previous == null) {
+                    return event;
+                } else {
+                    return new IntPair(previous._1 + event._1, previous._2 + event._2);
+                }
+            }
+
+            public IntPair mergeEvents(IntPair previous, IntPair events) {
+                return mergeEvent(previous, events);
+            }
+
+        }, queue);
+
+        {
+            credit_window_source.setEventHandler(new Task() {
+                @Override
+                public void run() {
+                    IntPair data = credit_window_source.getData();
+                    credit_window_filter.credit(data._1, data._2);
+                }
+            });
+            credit_window_source.resume();
+        }
+
+        //
+        MutableSink<Request> consumer_sink = new MutableSink<Request>();
+
+        {
+            consumer_sink.downstream_$eq(ScalaSupport.<Sink<Request>>none());
+        }
+
+        public LongCounter next_seq_id = new LongCounter(0);
+
+        public long get_next_seq_id() {
+            return next_seq_id.getAndIncrement();
+        }
+
+        short to_message_id(long value) {
+            return (short)
+                    (0x8000 | // MQTT message ids cannot be zero, so we always set the highest bit.
+                            (value & 0x7FFF)); // the lower 15 bits come for the original seq id.
+        }
+
+        CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>> credit_window_filter = new CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>>(consumer_sink.flatMap(ScalaSupport.toScala(new Fn1<Tuple2<Session<Delivery>, Delivery>, Option<Request>>() {
+            public Option<Request> apply(Tuple2<Session<Delivery>, Delivery> event) {
+                queue.assertExecuting();
+                Session<Delivery> session = event._1();
+                final Delivery delivery = event._2();
+
+                session_manager.delivered(session, delivery.size());
+
+                // Look up which QoS we need to send this message with..
+                SimpleAddress topic = delivery.sender().head().simple();
+
+                QoS qos = addresses.get(topic);
+                if (qos == null) {
+                    qos = ScalaSupport.<QoS>head(wildcards.get(topic.path()));
+                }
+
+                if (qos == null) {
+                    acked(delivery, Consumed$.MODULE$);
+                    return ScalaSupport.none();
+                } else {
+                    PUBLISH publish = new PUBLISH();
+                    publish.topicName(new UTF8Buffer(destination_parser.encode_destination(delivery.sender().head())));
+                    if (delivery.redeliveries() > 0) {
+                        publish.dup(true);
+                    }
+
+                    if (delivery.message().codec() == RawMessageCodec$.MODULE$) {
+                        publish.payload(((RawMessage) delivery.message()).payload());
+                    } else {
+                        if (publish_body) {
+                            try {
+                                publish.payload(delivery.message().getBodyAs(Buffer.class));
+                            } catch (FilterException e) {
+                                log.error(e, "Internal Server Error: Could not covert message body to a Buffer");
+                            }
+                        } else {
+                            publish.payload(delivery.message().encoded());
+                        }
+                    }
+
+                    handler.messages_sent().incrementAndGet();
+
+                    UnitFn1<DeliveryResult> ack = new UnitFn1<DeliveryResult>() {
+                        @Override
+                        public void call(DeliveryResult result) {
+                            acked(delivery, result);
+                        }
+                    };
+
+                    if (delivery.ack() != null && (qos != QoS.AT_MOST_ONCE)) {
+                        publish.qos(qos);
+                        short id = to_message_id(clean_session ?
+                                get_next_seq_id() : // generate our own seq id.
+                                delivery.seq() // use the durable sub's seq id..
+                        );
+
+                        publish.messageId(id);
+                        Request request = new Request(id, publish, ack);
+                        Request prev = in_flight_publishes.put(id, request);
+                        if (prev != null) {
+
+                            // A reconnecting client could have acked before
+                            // we get dispatched by the durable sub.
+                            if (prev.message() == null) {
+                                in_flight_publishes.remove(id);
+                                acked(delivery, Consumed$.MODULE$);
+                            } else {
+                                // Looks we sent out a msg with that id.  This could only
+                                // happen once we send out 0x7FFF message and the first
+                                // one has not been acked.
+                                handler.async_die("Client not acking regularly.", null);
+                            }
+                        }
+                        return ScalaSupport.some(request);
+
+                    } else {
+                        // This callback gets executed once the message
+                        // sent to the transport.
+                        publish.qos(QoS.AT_MOST_ONCE);
+                        return ScalaSupport.some(new Request((short) 0, publish, ack));
+                    }
+
+                }
+            }
+        })), SessionDeliverySizer$.MODULE$);
+
+
+        public void acked(Delivery delivery, DeliveryResult result) {
+            queue.assertExecuting();
+            credit_window_source.merge(new IntPair(delivery.size(), 1));
+            if (delivery.ack() != null) {
+                delivery.ack().apply(result, null);
+            }
+        }
+
+        {
+            credit_window_filter.credit(handler.codec().getWriteBufferSize() * 2, 1);
+        }
+
+        SessionSinkMux<Delivery> session_manager = new SessionSinkMux<Delivery>(credit_window_filter, queue, Delivery$.MODULE$, Integer.MAX_VALUE / 2, receive_buffer_size()) {
+            @Override
+            public long time_stamp() {
+                return host().broker().now();
+            }
+        };
+
+        private void super_dispose() {
+            super.dispose();
+        }
+
+        @Override
+        protected void dispose() {
+            queue.execute(new Task() {
+                @Override
+                public void run() {
+                    super_dispose();
+                }
+            });
+        }
+
+        @Override
+        public Option<BrokerConnection> connection() {
+            return handler != null ? ScalaSupport.some(handler.connection()) : ScalaSupport.<BrokerConnection>none();
+        }
+
+        @Override
+        public int receive_buffer_size() {
+            return 1024 * 64;
+        }
+
+        @Override
+        public boolean is_persistent() {
+            return false;
+        }
+
+        @Override
+        public boolean matches(Delivery message) {
+            return true;
+        }
+
+        //
+        // Each destination we subscribe to will establish a session with us.
+        //
+        class MqttConsumerSession extends AbstractSessionSinkFilter<Delivery> implements DeliverySession {
+
+            final DeliveryProducer producer;
+            final SessionSink<Delivery> downstream;
+
+            MqttConsumerSession(DeliveryProducer producer) {
+                producer.dispatch_queue().assertExecuting();
+                this.producer = producer;
+                downstream = session_manager.open(producer.dispatch_queue());
+                retain();
+            }
+
+            @Override
+            public SessionSink<Delivery> downstream_session_sink() {
+                return downstream;
+            }
+
+            @Override
+            public DeliveryProducer producer() {
+                return producer;
+            }
+
+            @Override
+            public String toString() {
+                if (handler == null) {
+                    return "unconnected";
+                } else {
+                    return "connection to " + handler.connection().transport().getRemoteAddress();
+                }
+            }
+
+            public MqttConsumer consumer() {
+                return mqtt_consumer();
+            }
+
+            public boolean closed = false;
+
+            public void close() {
+                producer.dispatch_queue().assertExecuting();
+                if (!closed) {
+                    closed = true;
+                    dispose();
+                }
+            }
+
+            public void dispose() {
+                session_manager.close(downstream(), ScalaSupport.toScala(new UnitFn1<Delivery>() {
+                    @Override
+                    public void call(Delivery delivery) {
+                        // We have been closed so we have to nak any deliveries.
+                        if (delivery.ack() != null) {
+                            delivery.ack().apply(Undelivered$.MODULE$, delivery.uow());
+                        }
+                    }
+                }));
+                release();
+            }
+
+            @Override
+            public boolean offer(Delivery delivery) {
+                if (full()) {
+                    return false;
+                } else {
+                    delivery.message().retain();
+                    boolean rc = downstream().offer(delivery);
+                    assert rc : "offer should be accepted since it was not full";
+                    return true;
+                }
+            }
+        }
+
+        public MqttConsumerSession connect(DeliveryProducer p) {
+            return new MqttConsumerSession(p);
+        }
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java?rev=1461839&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java Wed Mar 27 20:20:31 2013
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt;
+
+import org.apache.activemq.apollo.util.Log;
+import scala.Function1;
+import scala.Function2;
+import scala.Option;
+import scala.collection.immutable.List;
+import scala.runtime.BoxedUnit;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ScalaSupport {
+    private static ScalaSupportHelper$ helper = ScalaSupportHelper$.MODULE$;
+
+    static final Function1<Object,BoxedUnit> NOOP_FN1 = helper.toScala(new UnitFn1<Object>() {
+        @Override
+        public void call(Object v1) {
+        }
+    });
+
+    public static <T1> Function1<T1, BoxedUnit> noopFn1() {
+        return (Function1<T1, BoxedUnit>) NOOP_FN1;
+    }
+
+
+    public static <T1, R> Function1<T1, R> toScala(Fn1<T1, R> func) {
+        if( func == null ) {
+            return null;
+        }
+        return helper.toScala(func);
+    }
+
+    public static <T1, T2, R> Function2<T1, T2, R> toScala(Fn2<T1, T2, R> func) {
+        if( func == null ) {
+            return null;
+        }
+        return helper.toScala(func);
+    }
+
+    public static <T> Option<T> none() {
+        return helper.none();
+    }
+
+    public static <T> Option<T> some(T t) {
+        return helper.some(t);
+    }
+
+    public static <T> T head(Iterable<T> i) {
+        Iterator<T> iterator = i.iterator();
+        if( iterator.hasNext() ) {
+            return iterator.next();
+        } else {
+            return null;
+        }
+    }
+
+    public static <T,R> ArrayList<R> map(Collection<T> values, Fn1<T, R> func) {
+        ArrayList rc = new ArrayList(values.size());
+        for( T t: values) {
+            rc.add(func.apply(t));
+        }
+        return rc;
+    }
+
+    public static <T,R> ArrayList<R> flatMap(Collection<T> values, Fn1<T, Option<R>> func) {
+        ArrayList rc = new ArrayList(values.size());
+        for( T t: values) {
+            Option<R> opt = func.apply(t);
+            if( opt.isDefined() ) {
+                rc.add(opt.get());
+            }
+        }
+        return rc;
+    }
+
+    public static List<String> toList(String ... s) {
+        return helper.toList(s);
+    }
+
+
+    public static class Logger {
+        final Log log;
+
+        public Logger(Log log) {
+            this.log = log;
+        }
+
+        public <T> void trace(String message, Object ...args) {
+            helper.trace(log, message, args);
+        }
+        public <T> void debug(String message, Object ...args) {
+            helper.debug(log, message, args);
+        }
+        public <T> void info(String message, Object ...args) {
+            helper.info(log, message, args);
+        }
+        public <T> void warn(String message, Object ...args) {
+            helper.warn(log, message, args);
+        }
+        public <T> void error(String message, Object ...args) {
+            helper.error(log, message, args);
+        }
+
+        public <T> void trace(Throwable e, String message, Object ...args) {
+            helper.trace(log, e, message, args);
+        }
+        public <T> void debug(Throwable e, String message, Object ...args) {
+            helper.debug(log, e, message, args);
+        }
+        public <T> void info(Throwable e, String message, Object ...args) {
+            helper.info(log, e, message, args);
+        }
+        public <T> void warn(Throwable e, String message, Object ...args) {
+            helper.warn(log, e, message, args);
+        }
+        public <T> void error(Throwable e, String message, Object ...args) {
+            helper.error(log, e, message, args);
+        }
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala?rev=1461839&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala Wed Mar 27 20:20:31 2013
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt
+
+import scala.Function1
+import org.apache.activemq.apollo.util.Log
+import scala.runtime.BoxedUnit
+
+abstract class Fn1[-T1,+R] {
+  def apply(v1: T1): R
+}
+
+class UnitFn1[-T1] extends Fn1[T1, BoxedUnit] {
+  def call(v1: T1) = {}
+  def apply(v1: T1) = {
+    call(v1)
+    BoxedUnit.UNIT;
+  }
+}
+
+abstract class Fn2[-T1,-T2,+R] {
+  def apply(v1: T1, v2: T2): R
+}
+
+class UnitFn2[-T1,-T2] extends Fn2[T1,T2, BoxedUnit] {
+  def call (v1: T1, v2: T2) = {}
+  def apply(v1: T1, v2: T2) = {
+    call(v1, v2)
+    BoxedUnit.UNIT;
+  }
+}
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: chirino
+ * Date: 3/26/13
+ * Time: 3:27 PM
+ * To change this template use File | Settings | File Templates.
+ */
+object ScalaSupportHelper {
+  def toScala[T1,R](func:Fn1[T1,R]):Function1[T1,R] = (v1:T1) => { func.apply(v1) }
+  def toScala[T1,T2,R](func:Fn2[T1,T2,R]):Function2[T1,T2,R] = (v1:T1, v2:T2) => { func.apply(v1,v2) }
+  def none[T]:Option[T] = None
+  def some[T](t:T):Option[T] = Some(t)
+  def toList[T](args:Array[T]):List[T] = List(args:_*)
+
+  def trace(log:Log, message:String, args:Array[Object]) = log.trace(message, args:_*)
+  def debug(log:Log, message:String, args:Array[Object]) = log.debug(message, args:_*)
+  def info (log:Log, message:String, args:Array[Object]) = log.info (message, args:_*)
+  def warn (log:Log, message:String, args:Array[Object]) = log.warn (message, args:_*)
+  def error(log:Log, message:String, args:Array[Object]) = log.error(message, args:_*)
+
+  def trace(log:Log, e:Throwable, message:String, args:Array[Object]) = log.trace(e, message, args:_*)
+  def debug(log:Log, e:Throwable, message:String, args:Array[Object]) = log.debug(e, message, args:_*)
+  def info (log:Log, e:Throwable, message:String, args:Array[Object]) = log.info (e, message, args:_*)
+  def warn (log:Log, e:Throwable, message:String, args:Array[Object]) = log.warn (e, message, args:_*)
+  def error(log:Log, e:Throwable, message:String, args:Array[Object]) = log.error(e, message, args:_*)
+}

Copied: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.java (from r1459409, activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.java?p2=activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.java&p1=activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.scala&r1=1459409&r2=1461839&rev=1461839&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.java Wed Mar 27 20:20:31 2013
@@ -14,15 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.mqtt.dto
+package org.apache.activemq.apollo.mqtt.dto;
 
-import org.apache.activemq.apollo.util.DtoModule
+import org.apache.activemq.apollo.util.DtoModule;
 
 /**
+ * <p>
+ * </p>
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Module extends DtoModule {
-  def dto_package = "org.apache.activemq.apollo.mqtt.dto"
+public class Module implements DtoModule {
 
-  def extension_classes = Array(classOf[MqttDTO], classOf[MqttConnectionStatusDTO])
-}
\ No newline at end of file
+    @Override
+    public String dto_package() {
+        return "org.apache.activemq.apollo.mqtt.dto";
+    }
+
+    @Override
+    public Class<?>[] extension_classes() {
+        return new Class<?>[]{ MqttDTO.class, MqttConnectionStatusDTO.class };
+    }
+}

Modified: activemq/activemq-apollo/trunk/apollo-scala/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-scala/pom.xml?rev=1461839&r1=1461838&r2=1461839&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-scala/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-scala/pom.xml Wed Mar 27 20:20:31 2013
@@ -32,12 +32,7 @@
 
   <name>${project.artifactId}</name>
   <description>Holds the common Maven settings for Scala based modules.</description>
-  
-  <properties>
-    <scala-plugin-main-goal>compile</scala-plugin-main-goal>
-    <scala-plugin-test-goal>testCompile</scala-plugin-test-goal>
-  </properties>
-  
+
   <dependencies>
 
     <!-- Scala Support: this needs to be copied into every scala project -->
@@ -81,9 +76,10 @@
         <executions>
           <execution>
             <id>compile</id>
-            <phase>compile</phase>
+            <phase>process-resources</phase>
             <goals>
-              <goal>${scala-plugin-main-goal}</goal>
+              <goal>add-source</goal>
+              <goal>compile</goal>
             </goals>
             <configuration>
               <testSourceDir>dontcompile</testSourceDir>
@@ -106,7 +102,7 @@
             <id>test</id>
             <phase>test-compile</phase>
             <goals>
-              <goal>${scala-plugin-test-goal}</goal>
+              <goal>testCompile</goal>
             </goals>
             <configuration>
               <!-- <displayCmd>true</displayCmd> -->

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.scala?rev=1461839&r1=1461838&r2=1461839&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.scala Wed Mar 27 20:20:31 2013
@@ -18,6 +18,7 @@ package org.apache.activemq.apollo.util.
 
 object Path {
   def apply(value:String*):Path = Path(value.toList.map(LiteralPart(_)))
+  def create(value:String):Path = Path(LiteralPart(value)::Nil)
 }
 
 /**