You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/03/28 00:58:59 UTC

svn commit: r1461905 [2/2] - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ apollo-mqtt/src/main/scala/org/apache/activemq/apo...

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java?rev=1461905&r1=1461904&r2=1461905&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java Wed Mar 27 23:58:58 2013
@@ -24,8 +24,7 @@ import org.apache.activemq.apollo.broker
 import org.apache.activemq.apollo.filter.FilterException;
 import org.apache.activemq.apollo.mqtt.MqttSessionManager.HostState;
 import org.apache.activemq.apollo.mqtt.MqttSessionManager.SessionState;
-import org.apache.activemq.apollo.util.LRUCache;
-import org.apache.activemq.apollo.util.LongCounter;
+import org.apache.activemq.apollo.util.*;
 import org.apache.activemq.apollo.util.path.Path$;
 import org.apache.activemq.apollo.util.path.PathMap;
 import org.apache.activemq.apollo.util.path.PathParser;
@@ -37,7 +36,6 @@ import org.fusesource.mqtt.client.Topic;
 import org.fusesource.mqtt.codec.*;
 import scala.Option;
 import scala.Tuple2;
-import scala.runtime.BoxedUnit;
 
 import java.net.ProtocolException;
 import java.util.*;
@@ -45,6 +43,8 @@ import java.util.*;
 import static org.fusesource.hawtdispatch.Dispatch.NOOP;
 import static org.fusesource.hawtdispatch.Dispatch.createQueue;
 
+import static org.apache.activemq.apollo.mqtt.MqttProtocolHandler.*;
+
 /**
  * An MqttSession can be switch from one connection/protocol handler to another,
  * but it will only be associated with one at a time. An MqttSession tracks
@@ -54,14 +54,6 @@ import static org.fusesource.hawtdispatc
  */
 public class MqttSession {
 
-    public static final ScalaSupport.Logger log = new ScalaSupport.Logger(MqttProtocolHandler$.MODULE$);
-
-    public static <T> T received(T value) {
-        log.trace("received: %s", value);
-        return value;
-    }
-
-
     public final HostState host_state;
     public final UTF8Buffer client_id;
     public final SessionState session_state;
@@ -83,7 +75,7 @@ public class MqttSession {
     boolean publish_body = false;
 
     public VirtualHost host() {
-        return host_state.host();
+        return host_state.host;
     }
 
     public void connect(final MqttProtocolHandler next) {
@@ -142,23 +134,23 @@ public class MqttSession {
     public void attach() {
         queue.assertExecuting();
         final MqttProtocolHandler h = handler;
-        clean_session = h.connect_message().cleanSession();
-        security_context = h.security_context();
-        h.command_handler_$eq(ScalaSupport.toScala(new UnitFn1<Object>() {
+        clean_session = h.connect_message.cleanSession();
+        security_context = h.security_context;
+        h.command_handler = new UnitFn1<Object>() {
             @Override
             public void call(Object v1) {
                 on_transport_command(v1);
             }
-        }));
+        };
 
         destination_parser = h.destination_parser();
-        mqtt_consumer().consumer_sink.downstream_$eq(ScalaSupport.some(h.sink_manager().open()));
+        mqtt_consumer().consumer_sink.downstream_$eq(Scala2Java.some(h.sink_manager.open()));
 
         final Task ack_connect = new Task() {
             @Override
             public void run() {
                 queue.assertExecuting();
-                connect_message = h.connect_message();
+                connect_message = h.connect_message;
                 CONNACK connack = new CONNACK();
                 connack.code(CONNACK.Code.CONNECTION_ACCEPTED);
                 send(connack);
@@ -167,10 +159,10 @@ public class MqttSession {
 
         if (!clean_session) {
             // Setup the previous subscriptions..
-            session_state.strategy().create(host().store(), client_id);
-            if (!session_state.subscriptions().isEmpty()) {
+            session_state.strategy.create(host().store(), client_id);
+            if (!session_state.subscriptions.isEmpty()) {
                 h._suspend_read("subscribing");
-                ArrayList<Topic> topics = ScalaSupport.map(session_state.subscriptions().values(), new Fn1<Tuple2<Topic, BindAddress>, Topic>() {
+                ArrayList<Topic> topics = Scala2Java.map(session_state.subscriptions.values(), new Fn1<Tuple2<Topic, BindAddress>, Topic>() {
                     @Override
                     public Topic apply(Tuple2<Topic, BindAddress> v1) {
                         return v1._1();
@@ -189,10 +181,10 @@ public class MqttSession {
         } else {
             // do we need to clear the received ids?
             // durable_session_state.received_message_ids.clear()
-            session_state.subscriptions().clear();
-            if (session_state.durable_sub() != null) {
-                final DestinationAddress[] addresses = new DestinationAddress[]{session_state.durable_sub()};
-                session_state.durable_sub_$eq(null);
+            session_state.subscriptions.clear();
+            if (session_state.durable_sub != null) {
+                final DestinationAddress[] addresses = new DestinationAddress[]{session_state.durable_sub};
+                session_state.durable_sub = null;
                 host().dispatch_queue().execute(new Task() {
                     @Override
                     public void run() {
@@ -201,7 +193,7 @@ public class MqttSession {
                 });
             }
 
-            session_state.strategy().destroy(new Task() {
+            session_state.strategy.destroy(new Task() {
                 @Override
                 public void run() {
                     ack_connect.run();
@@ -237,10 +229,10 @@ public class MqttSession {
                 });
                 mqtt_consumer().addresses.clear();
             }
-            session_state.subscriptions().clear();
+            session_state.subscriptions.clear();
         } else {
-            if (session_state.durable_sub() != null) {
-                final BindAddress[] addresses = new BindAddress[]{session_state.durable_sub()};
+            if (session_state.durable_sub != null) {
+                final BindAddress[] addresses = new BindAddress[]{session_state.durable_sub};
                 host().dispatch_queue().execute(new Runnable() {
                     @Override
                     public void run() {
@@ -248,27 +240,27 @@ public class MqttSession {
                     }
                 });
                 mqtt_consumer().addresses.clear();
-                session_state.durable_sub_$eq(null);
+                session_state.durable_sub = null;
             }
         }
 
         for (Request request : in_flight_publishes.values()) {
-            if (request.ack() != null) {
-                request.ack().apply(
-                        request.delivered() ? Delivered$.MODULE$ : Undelivered$.MODULE$
+            if (request.ack != null) {
+                request.ack.apply(
+                        request.delivered ? Delivered$.MODULE$ : Undelivered$.MODULE$
                 );
             }
         }
         in_flight_publishes.clear();
 
-        handler.sink_manager().close(mqtt_consumer().consumer_sink.downstream().get(), ScalaSupport.<Request>noopFn1());
-        mqtt_consumer().consumer_sink.downstream_$eq(ScalaSupport.<Sink<Request>>none());
+        handler.sink_manager.close(mqtt_consumer().consumer_sink.downstream().get(), Scala2Java.<Request>noopFn1());
+        mqtt_consumer().consumer_sink.downstream_$eq(Scala2Java.<Sink<Request>>none());
         handler.on_transport_disconnected();
     }
 
     public SimpleAddress decode_destination(UTF8Buffer value) {
 
-        SimpleAddress rc = destination_parser.decode_single_destination(value.toString(), ScalaSupport.toScala(new Fn1<String, SimpleAddress>() {
+        SimpleAddress rc = destination_parser.decode_single_destination(value.toString(), Scala2Java.toScala(new Fn1<String, SimpleAddress>() {
             public SimpleAddress apply(String name) {
                 return new SimpleAddress("topic", destination_parser.decode_path(name));
             }
@@ -291,15 +283,15 @@ public class MqttSession {
 
     public void send(MessageSupport.Message message) {
         queue.assertExecuting();
-        handler.connection_sink().offer(new Request((short) 0, message, null));
+        handler.connection_sink.offer(new Request((short) 0, message, null));
     }
 
     public void publish_completed(short id) {
         queue.assertExecuting();
         Request request = in_flight_publishes.remove(id);
         if (request != null) {
-            if (request.ack() != null) {
-                request.ack().apply(Consumed$.MODULE$);
+            if (request.ack != null) {
+                request.ack.apply(Consumed$.MODULE$);
             }
         } else {
             // It's possible that on a reconnect, we get an ACK
@@ -329,8 +321,8 @@ public class MqttSession {
                         final PUBREL ack = received(new PUBREL().decode(command));
                         // TODO: perhaps persist the processed list.. otherwise
                         // we can't filter out dups after a broker restart.
-                        session_state.received_message_ids().remove(ack.messageId());
-                        session_state.strategy().update(new Task() {
+                        session_state.received_message_ids.remove(ack.messageId());
+                        session_state.strategy.update(new Task() {
                             @Override
                             public void run() {
                                 send(new PUBCOMP().messageId(ack.messageId()));
@@ -437,12 +429,12 @@ public class MqttSession {
 
         @Override
         public int send_buffer_size() {
-            return handler.codec().getReadBufferSize();
+            return handler.codec.getReadBufferSize();
         }
 
         @Override
         public Option<BrokerConnection> connection() {
-            return ScalaSupport.some(handler.connection());
+            return Scala2Java.some(handler.connection());
         }
 
         @Override
@@ -453,14 +445,14 @@ public class MqttSession {
 
     public void on_mqtt_publish(final PUBLISH publish) {
 
-        if ((publish.qos() == QoS.EXACTLY_ONCE) && session_state.received_message_ids().contains(publish.messageId())) {
+        if ((publish.qos() == QoS.EXACTLY_ONCE) && session_state.received_message_ids.contains(publish.messageId())) {
             PUBREC response = new PUBREC();
             response.messageId(publish.messageId());
             send(response);
             return;
         }
 
-        handler.messages_received().incrementAndGet();
+        handler.messages_received.incrementAndGet();
 
         queue.assertExecuting();
         MqttProducerRoute route = producerRoutes.get(publish.topicName());
@@ -526,8 +518,8 @@ public class MqttSession {
                 @Override
                 public void run() {
                     // TODO: perhaps persist the processed list..
-                    session_state.received_message_ids().add(publish.messageId());
-                    session_state.strategy().update(new Task() {
+                    session_state.received_message_ids.add(publish.messageId());
+                    session_state.strategy.update(new Task() {
                         @Override
                         public void run() {
                             PUBREC response = new PUBREC();
@@ -555,7 +547,7 @@ public class MqttSession {
             delivery.message_$eq(new RawMessage(publish.payload()));
             delivery.persistent_$eq(publish.qos().ordinal() > 0);
             delivery.size_$eq(publish.payload().length);
-            delivery.ack_$eq(ScalaSupport.toScala(ack));
+            delivery.ack_$eq(Scala2Java.toScala(ack));
             if (publish.retain()) {
                 if (delivery.size() == 0) {
                     delivery.retain_$eq(RetainRemove$.MODULE$);
@@ -599,7 +591,7 @@ public class MqttSession {
 
                     @Override
                     public Option<BrokerConnection> connection() {
-                        return handler != null ? ScalaSupport.some(handler.connection()) : ScalaSupport.<BrokerConnection>none();
+                        return handler != null ? Scala2Java.some(handler.connection()) : Scala2Java.<BrokerConnection>none();
                     }
 
                     @Override
@@ -629,7 +621,7 @@ public class MqttSession {
                                             delivery.retain_$eq(RetainSet$.MODULE$);
                                         }
                                     }
-                                    delivery.ack_$eq(ScalaSupport.toScala(new UnitFn2<DeliveryResult, StoreUOW>() {
+                                    delivery.ack_$eq(Scala2Java.toScala(new UnitFn2<DeliveryResult, StoreUOW>() {
                                         @Override
                                         public void call(DeliveryResult x, StoreUOW y) {
                                             host().dispatch_queue().execute(new Task() {
@@ -641,7 +633,7 @@ public class MqttSession {
                                             complete_close.run();
                                         }
                                     }));
-                                    handler.messages_received().incrementAndGet();
+                                    handler.messages_received.incrementAndGet();
                                     prodcuer.offer(delivery);
                                 }
                             }
@@ -664,7 +656,7 @@ public class MqttSession {
                 queue.execute(new Task() {
                     @Override
                     public void run() {
-                        session_state.strategy().update(new Task() {
+                        session_state.strategy.update(new Task() {
                             @Override
                             public void run() {
                                 SUBACK suback = new SUBACK();
@@ -688,11 +680,11 @@ public class MqttSession {
     }
 
     public void subscribe(Collection<Topic> topics, final Task on_subscribed) {
-        final ArrayList<BindAddress> addresses = ScalaSupport.map(topics, new Fn1<Topic, BindAddress>() {
+        final ArrayList<BindAddress> addresses = Scala2Java.map(topics, new Fn1<Topic, BindAddress>() {
             @Override
             public BindAddress apply(Topic topic) {
                 BindAddress address = decode_destination(topic.name());
-                session_state.subscriptions().put(topic.name(), new Tuple2<Topic, BindAddress>(topic, address));
+                session_state.subscriptions.put(topic.name(), new Tuple2<Topic, BindAddress>(topic, address));
                 mqtt_consumer().addresses.put(address, topic.qos());
                 if (PathParser.containsWildCards(address.path())) {
                     mqtt_consumer().wildcards.put(address.path(), topic.qos());
@@ -702,12 +694,12 @@ public class MqttSession {
         });
 
 
-        handler.subscription_count_$eq(mqtt_consumer().addresses.size());
+        handler.subscription_count = mqtt_consumer().addresses.size();
 
         if (!clean_session) {
             Set<BindAddress> bindAddressSet = mqtt_consumer().addresses.keySet();
             SubscriptionAddress durable_sub = new SubscriptionAddress(Path$.MODULE$.create(client_id.toString()), null, bindAddressSet.toArray(new BindAddress[bindAddressSet.size()]));
-            session_state.durable_sub_$eq(durable_sub);
+            session_state.durable_sub = durable_sub;
             addresses.clear();
             addresses.add(durable_sub);
         }
@@ -716,7 +708,7 @@ public class MqttSession {
             @Override
             public void run() {
                 for (BindAddress address : addresses) {
-                    host().router().bind(new BindAddress[]{address}, mqtt_consumer(), security_context, ScalaSupport.<Option<String>>noopFn1());
+                    host().router().bind(new BindAddress[]{address}, mqtt_consumer(), security_context, Scala2Java.<Option<String>>noopFn1());
                 }
                 on_subscribed.run();
             }
@@ -725,10 +717,10 @@ public class MqttSession {
 
     public void on_mqtt_unsubscribe(final UNSUBSCRIBE unsubscribe) {
 
-        ArrayList<BindAddress> addressesList = ScalaSupport.flatMap(Arrays.asList(unsubscribe.topics()), new Fn1<UTF8Buffer, Option<BindAddress>>() {
+        ArrayList<BindAddress> addressesList = Scala2Java.flatMap(Arrays.asList(unsubscribe.topics()), new Fn1<UTF8Buffer, Option<BindAddress>>() {
             @Override
             public Option<BindAddress> apply(UTF8Buffer topicName) {
-                Tuple2<Topic, BindAddress> removed = session_state.subscriptions().remove(topicName);
+                Tuple2<Topic, BindAddress> removed = session_state.subscriptions.remove(topicName);
                 if (removed != null) {
                     Topic topic = removed._1();
                     BindAddress address = removed._2();
@@ -736,20 +728,20 @@ public class MqttSession {
                     if (PathParser.containsWildCards(address.path())) {
                         mqtt_consumer().wildcards.remove(address.path(), topic.qos());
                     }
-                    return ScalaSupport.some(address);
+                    return Scala2Java.some(address);
                 } else {
-                    return ScalaSupport.none();
+                    return Scala2Java.none();
                 }
 
             }
         });
         final BindAddress[] addresses = addressesList.toArray(new BindAddress[addressesList.size()]);
 
-        handler.subscription_count_$eq(mqtt_consumer().addresses.size());
+        handler.subscription_count = mqtt_consumer().addresses.size();
 
         if (!clean_session) {
             Set<BindAddress> bindAddressSet = mqtt_consumer().addresses.keySet();
-            session_state.durable_sub_$eq(new SubscriptionAddress(Path$.MODULE$.create(client_id.toString()), null, bindAddressSet.toArray(new BindAddress[bindAddressSet.size()])));
+            session_state.durable_sub = new SubscriptionAddress(Path$.MODULE$.create(client_id.toString()), null, bindAddressSet.toArray(new BindAddress[bindAddressSet.size()]));
         }
 
         host().dispatch_queue().execute(new Task() {
@@ -759,16 +751,16 @@ public class MqttSession {
                     host().router().unbind(addresses, mqtt_consumer(), false, security_context);
                 } else {
                     if (mqtt_consumer().addresses.isEmpty()) {
-                        host().router().unbind(new BindAddress[]{session_state.durable_sub()}, mqtt_consumer(), true, security_context);
-                        session_state.durable_sub_$eq(null);
+                        host().router().unbind(new BindAddress[]{session_state.durable_sub}, mqtt_consumer(), true, security_context);
+                        session_state.durable_sub = null;
                     } else {
-                        host().router().bind(new BindAddress[]{session_state.durable_sub()}, mqtt_consumer(), security_context, ScalaSupport.<Option<String>>noopFn1());
+                        host().router().bind(new BindAddress[]{session_state.durable_sub}, mqtt_consumer(), security_context, Scala2Java.<Option<String>>noopFn1());
                     }
                 }
                 queue.execute(new Task() {
                     @Override
                     public void run() {
-                        session_state.strategy().update(new Task() {
+                        session_state.strategy.update(new Task() {
                             @Override
                             public void run() {
                                 UNSUBACK ack = new UNSUBACK();
@@ -847,7 +839,7 @@ public class MqttSession {
         MutableSink<Request> consumer_sink = new MutableSink<Request>();
 
         {
-            consumer_sink.downstream_$eq(ScalaSupport.<Sink<Request>>none());
+            consumer_sink.downstream_$eq(Scala2Java.<Sink<Request>>none());
         }
 
         public LongCounter next_seq_id = new LongCounter(0);
@@ -862,7 +854,7 @@ public class MqttSession {
                             (value & 0x7FFF)); // the lower 15 bits come for the original seq id.
         }
 
-        CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>> credit_window_filter = new CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>>(consumer_sink.flatMap(ScalaSupport.toScala(new Fn1<Tuple2<Session<Delivery>, Delivery>, Option<Request>>() {
+        CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>> credit_window_filter = new CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>>(consumer_sink.flatMap(Scala2Java.toScala(new Fn1<Tuple2<Session<Delivery>, Delivery>, Option<Request>>() {
             public Option<Request> apply(Tuple2<Session<Delivery>, Delivery> event) {
                 queue.assertExecuting();
                 Session<Delivery> session = event._1();
@@ -875,12 +867,12 @@ public class MqttSession {
 
                 QoS qos = addresses.get(topic);
                 if (qos == null) {
-                    qos = ScalaSupport.<QoS>head(wildcards.get(topic.path()));
+                    qos = Scala2Java.<QoS>head(wildcards.get(topic.path()));
                 }
 
                 if (qos == null) {
                     acked(delivery, Consumed$.MODULE$);
-                    return ScalaSupport.none();
+                    return Scala2Java.none();
                 } else {
                     PUBLISH publish = new PUBLISH();
                     publish.topicName(new UTF8Buffer(destination_parser.encode_destination(delivery.sender().head())));
@@ -902,7 +894,7 @@ public class MqttSession {
                         }
                     }
 
-                    handler.messages_sent().incrementAndGet();
+                    handler.messages_sent.incrementAndGet();
 
                     UnitFn1<DeliveryResult> ack = new UnitFn1<DeliveryResult>() {
                         @Override
@@ -925,7 +917,7 @@ public class MqttSession {
 
                             // A reconnecting client could have acked before
                             // we get dispatched by the durable sub.
-                            if (prev.message() == null) {
+                            if (prev.message == null) {
                                 in_flight_publishes.remove(id);
                                 acked(delivery, Consumed$.MODULE$);
                             } else {
@@ -935,18 +927,18 @@ public class MqttSession {
                                 handler.async_die("Client not acking regularly.", null);
                             }
                         }
-                        return ScalaSupport.some(request);
+                        return Scala2Java.some(request);
 
                     } else {
                         // This callback gets executed once the message
                         // sent to the transport.
                         publish.qos(QoS.AT_MOST_ONCE);
-                        return ScalaSupport.some(new Request((short) 0, publish, ack));
+                        return Scala2Java.some(new Request((short) 0, publish, ack));
                     }
 
                 }
             }
-        })), SessionDeliverySizer$.MODULE$);
+        })), SessionDeliverySizer.INSTANCE);
 
 
         public void acked(Delivery delivery, DeliveryResult result) {
@@ -958,7 +950,7 @@ public class MqttSession {
         }
 
         {
-            credit_window_filter.credit(handler.codec().getWriteBufferSize() * 2, 1);
+            credit_window_filter.credit(handler.codec.getWriteBufferSize() * 2, 1);
         }
 
         SessionSinkMux<Delivery> session_manager = new SessionSinkMux<Delivery>(credit_window_filter, queue, Delivery$.MODULE$, Integer.MAX_VALUE / 2, receive_buffer_size()) {
@@ -984,7 +976,7 @@ public class MqttSession {
 
         @Override
         public Option<BrokerConnection> connection() {
-            return handler != null ? ScalaSupport.some(handler.connection()) : ScalaSupport.<BrokerConnection>none();
+            return handler != null ? Scala2Java.some(handler.connection()) : Scala2Java.<BrokerConnection>none();
         }
 
         @Override
@@ -1051,7 +1043,7 @@ public class MqttSession {
             }
 
             public void dispose() {
-                session_manager.close(downstream(), ScalaSupport.toScala(new UnitFn1<Delivery>() {
+                session_manager.close(downstream(), Scala2Java.toScala(new UnitFn1<Delivery>() {
                     @Override
                     public void call(Delivery delivery) {
                         // We have been closed so we have to nak any deliveries.

Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSessionManager.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSessionManager.java?rev=1461905&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSessionManager.java (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSessionManager.java Wed Mar 27 23:58:58 2013
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt;
+
+import org.apache.activemq.apollo.broker.BindAddress;
+import org.apache.activemq.apollo.broker.SimpleAddress;
+import org.apache.activemq.apollo.broker.SubscriptionAddress;
+import org.apache.activemq.apollo.broker.VirtualHost;
+import org.apache.activemq.apollo.broker.store.Store;
+import org.apache.activemq.apollo.broker.store.StoreUOW;
+import org.apache.activemq.apollo.util.Fn0;
+import org.apache.activemq.apollo.util.Scala2Java;
+import org.apache.activemq.apollo.util.UnitFn0;
+import org.apache.activemq.apollo.util.UnitFn1;
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.hawtbuf.proto.InvalidProtocolBufferException;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import scala.Tuple2;
+import scala.collection.Seq;
+
+import java.util.HashMap;
+import java.util.HashSet;
+
+import static org.fusesource.hawtdispatch.Dispatch.createQueue;
+
+/**
+ * Tracks active sessions so that we can ensure that a given
+ * session id is only associated with once connection
+ * at a time.  If a client tries to establish a 2nd
+ * connection, the first one will be closed before the session
+ * is switch to the new connection.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MqttSessionManager {
+
+    public static final Scala2Java.Logger log = MqttProtocolHandler.log;
+
+    static DispatchQueue queue = createQueue("session manager");
+
+    interface StorageStrategy {
+        void update(Task cb);
+        void destroy(Task cb);
+        void create(Store store, UTF8Buffer client_id);
+    }
+
+    static class SessionState {
+        SubscriptionAddress durable_sub = null;
+        java.util.HashMap<UTF8Buffer, Tuple2<Topic, BindAddress>> subscriptions = new java.util.HashMap<UTF8Buffer, Tuple2<Topic, BindAddress>>();
+        HashSet<Short> received_message_ids = new HashSet<Short>();
+        StorageStrategy strategy = new NoopStrategy();
+
+        class NoopStrategy implements StorageStrategy {
+
+            @Override
+            public void create(Store store, UTF8Buffer client_id) {
+                if (store != null) {
+                    strategy = new StoreStrategy(store, client_id);
+                }
+            }
+
+            @Override
+            public void update(Task cb) {
+                cb.run();
+            }
+
+            @Override
+            public void destroy(Task cb) {
+                cb.run();
+            }
+        }
+
+        class StoreStrategy implements StorageStrategy {
+
+            public final Store store;
+            public final UTF8Buffer client_id;
+            public final UTF8Buffer session_key;
+
+            public StoreStrategy(Store store, UTF8Buffer client_id) {
+                this.store = store;
+                this.client_id = client_id;
+                this.session_key = new UTF8Buffer("mqtt:" + client_id);
+            }
+
+            @Override
+            public void create(Store store, UTF8Buffer client_id) {
+            }
+
+            @Override
+            public void update(final Task cb) {
+                StoreUOW uow = store.create_uow();
+                SessionPB.Bean session_pb = new SessionPB.Bean();
+                session_pb.setClientId(client_id);
+                for (Short id : received_message_ids) {
+                    session_pb.addReceivedMessageIds(id.intValue());
+                }
+                for (Tuple2<Topic, BindAddress> entry : subscriptions.values()) {
+                    Topic topic = entry._1();
+                    BindAddress address = entry._2();
+                    TopicPB.Bean topic_pb = new TopicPB.Bean();
+                    topic_pb.setName(topic.name());
+                    topic_pb.setQos(topic.qos().ordinal());
+                    topic_pb.setAddress(new UTF8Buffer(address.toString()));
+                    session_pb.addSubscriptions(topic_pb);
+                }
+                uow.put(session_key, session_pb.freeze().toUnframedBuffer());
+
+                final DispatchQueue current = Dispatch.getCurrentQueue();
+                uow.on_complete(Scala2Java.toScala(new UnitFn0() {
+                    @Override
+                    public void call() {
+                        current.execute(new Task() {
+                            @Override
+                            public void run() {
+                                cb.run();
+                            }
+                        });
+                    }
+                }));
+                uow.release();
+            }
+
+            @Override
+            public void destroy(final Task cb) {
+                StoreUOW uow = store.create_uow();
+                uow.put(session_key, null);
+                final DispatchQueue current = Dispatch.getCurrentQueue();
+                uow.on_complete(Scala2Java.toScala(new UnitFn0() {
+                    @Override
+                    public void call() {
+                        current.execute(new Task() {
+                            @Override
+                            public void run() {
+                                strategy = new NoopStrategy();
+                                cb.run();
+                            }
+                        });
+                    }
+                }));
+                uow.release();
+            }
+
+        }
+    }
+
+
+    static public class HostState {
+
+        public final VirtualHost host;
+        public final HashMap<UTF8Buffer, SessionState> session_states = new HashMap<UTF8Buffer, SessionState>();
+        public final HashMap<UTF8Buffer, MqttSession> sessions = new HashMap<UTF8Buffer, MqttSession>();
+        public boolean loaded = false;
+
+        public HostState(VirtualHost host) {
+            this.host = host;
+        }
+
+        public void on_load(final Task func) {
+            if (loaded) {
+                func.run();
+            } else {
+                if (host.store() != null) {
+                    // We load all the persisted session's from the host's store when we are first accessed.
+                    queue.suspend();
+                    host.store().get_prefixed_map_entries(new AsciiBuffer("mqtt:"), Scala2Java.toScala(new UnitFn1<Seq<Tuple2<Buffer, Buffer>>>() {
+                        @Override
+                        public void call(final Seq<Tuple2<Buffer, Buffer>> entries) {
+                            queue.resume();
+                            queue.execute(new Task() {
+                                @Override
+                                public void run() {
+                                    for (Tuple2<Buffer, Buffer> entry : Scala2Java.toIterable(entries)) {
+                                        try {
+                                            Buffer value = entry._2();
+                                            SessionPB.Buffer session_pb = SessionPB.FACTORY.parseUnframed(value);
+                                            SessionState session_state = new SessionState();
+                                            session_state.strategy.create(host.store(), session_pb.getClientId());
+                                            if (session_pb.hasReceivedMessageIds()) {
+                                                for (Integer i : session_pb.getReceivedMessageIdsList()) {
+                                                    session_state.received_message_ids.add(i.shortValue());
+                                                }
+                                            }
+                                            if (session_pb.hasSubscriptions()) {
+                                                for (TopicPB.Getter sub : session_pb.getSubscriptionsList()) {
+                                                    SimpleAddress address = SimpleAddress.apply(sub.getAddress().toString());
+                                                    Topic topic = new Topic(sub.getName(), QoS.values()[sub.getQos()]);
+                                                    session_state.subscriptions.put(sub.getName(), new Tuple2<Topic, BindAddress>(topic, address));
+
+                                                }
+                                            }
+                                            session_states.put(session_pb.getClientId(), session_state);
+                                        } catch (InvalidProtocolBufferException e) {
+                                            log.warn(e, "Could not load a stored MQTT session");
+                                        }
+                                    }
+                                    loaded = true;
+                                    func.run();
+                                }
+                            });
+                        }
+                    }));
+
+                } else {
+                    loaded = true;
+                    func.run();
+                }
+            }
+        }
+    }
+
+    static public void attach(final VirtualHost host, final UTF8Buffer client_id, final MqttProtocolHandler handler) {
+        queue.execute(new Task() {
+            @Override
+            public void run() {
+                final HostState host_state = host.plugin_state(
+                        Scala2Java.toScala(new Fn0<HostState>(){
+                            @Override
+                            public HostState apply() {
+                                return new HostState(host);
+                            }
+                        }),
+                        HostState.class);
+                host_state.on_load(new Task() {
+                    @Override
+                    public void run() {
+                        MqttSession assignment = host_state.sessions.get(client_id);
+                        if (assignment != null) {
+                            assignment.connect(handler);
+                        } else {
+                            SessionState state;
+                            if (handler.connect_message.cleanSession()) {
+                                state = host_state.session_states.remove(client_id);
+                                if (state == null) {
+                                    state = new SessionState();
+                                }
+                            } else {
+                                state = host_state.session_states.get(client_id);
+                                if (state == null) {
+                                    state = new SessionState();
+                                    host_state.session_states.put(client_id, state);
+                                }
+                            }
+                            assignment = new MqttSession(host_state, client_id, state);
+                            assignment.connect(handler);
+                            host_state.sessions.put(client_id, assignment);
+                        }
+                    }
+                });
+            }
+        });
+    }
+
+    static public void disconnect(final HostState host_state, final UTF8Buffer client_id, final MqttProtocolHandler handler) {
+        queue.execute(new Task() {
+            @Override
+            public void run() {
+                MqttSession assignment = host_state.sessions.get(client_id);
+                if (assignment != null) {
+                    assignment.disconnect(handler);
+                }
+            }
+        });
+    }
+
+    static public void remove(final HostState host_state, final UTF8Buffer client_id) {
+        queue.execute(new Task() {
+            @Override
+            public void run() {
+                host_state.sessions.remove(client_id);
+            }
+        });
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/Request.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/Request.java?rev=1461905&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/Request.java (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/Request.java Wed Mar 27 23:58:58 2013
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt;
+
+import org.apache.activemq.apollo.broker.DeliveryResult;
+import org.apache.activemq.apollo.util.UnitFn1;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.MessageSupport;
+
+/**
+ */
+public class Request {
+
+    public final short id;
+    public final MessageSupport.Message message;
+    public final UnitFn1<DeliveryResult> ack;
+
+    MQTTFrame frame;
+    boolean delivered;
+
+    public Request(short id, MessageSupport.Message message, UnitFn1<DeliveryResult> ack) {
+        this.id = id;
+        this.message = message;
+        this.ack = ack;
+        frame = message==null ? null : message.encode();
+    }
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/SessionDeliverySizer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/SessionDeliverySizer.java?rev=1461905&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/SessionDeliverySizer.java (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/SessionDeliverySizer.java Wed Mar 27 23:58:58 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt;
+
+import org.apache.activemq.apollo.broker.Delivery;
+import org.apache.activemq.apollo.broker.Delivery$;
+import org.apache.activemq.apollo.broker.Session;
+import org.apache.activemq.apollo.broker.Sizer;
+import scala.Tuple2;
+
+/**
+ */
+class SessionDeliverySizer implements Sizer<Tuple2<Session<Delivery>, Delivery>> {
+
+    public static final SessionDeliverySizer INSTANCE = new SessionDeliverySizer();
+
+    @Override
+    public int size(Tuple2<Session<Delivery>, Delivery> value) {
+        return Delivery$.MODULE$.size(value._2());
+    }
+}

Copied: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2Java.java (from r1461839, activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2Java.java?p2=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2Java.java&p1=activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java&r1=1461839&r2=1461905&rev=1461905&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2Java.java Wed Mar 27 23:58:58 2013
@@ -14,15 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.mqtt;
+package org.apache.activemq.apollo.util;
 
-import org.apache.activemq.apollo.util.Log;
-import scala.Function1;
-import scala.Function2;
-import scala.Option;
+import org.apache.activemq.apollo.util.*;
+import org.fusesource.hawtbuf.Buffer;
+import scala.*;
+import scala.collection.JavaConversions$;
+import scala.collection.Seq;
 import scala.collection.immutable.List;
 import scala.runtime.BoxedUnit;
 
+import java.lang.Boolean;
+import java.lang.Long;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -30,8 +33,31 @@ import java.util.Iterator;
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class ScalaSupport {
-    private static ScalaSupportHelper$ helper = ScalaSupportHelper$.MODULE$;
+public class Scala2Java {
+    private static Scala2JavaHelper$ helper = Scala2JavaHelper$.MODULE$;
+
+
+    public static int get(Integer value, int defaultValue) {
+        if( value!=null ) {
+            return value.intValue();
+        } else {
+            return defaultValue;
+        }
+    }
+    public static long get(Long value, long defaultValue) {
+        if( value!=null ) {
+            return value.longValue();
+        } else {
+            return defaultValue;
+        }
+    }
+    public static boolean get(Boolean value, boolean defaultValue) {
+        if( value!=null ) {
+            return value.booleanValue();
+        } else {
+            return defaultValue;
+        }
+    }
 
     static final Function1<Object,BoxedUnit> NOOP_FN1 = helper.toScala(new UnitFn1<Object>() {
         @Override
@@ -39,10 +65,20 @@ public class ScalaSupport {
         }
     });
 
+    public static String toString(Object o) {
+        return o == null ? null : o.toString();
+    }
+
     public static <T1> Function1<T1, BoxedUnit> noopFn1() {
         return (Function1<T1, BoxedUnit>) NOOP_FN1;
     }
 
+    public static <R> Function0<R> toScala(Fn0<R> func) {
+        if( func == null ) {
+            return null;
+        }
+        return helper.toScala(func);
+    }
 
     public static <T1, R> Function1<T1, R> toScala(Fn1<T1, R> func) {
         if( func == null ) {
@@ -98,6 +134,9 @@ public class ScalaSupport {
         return helper.toList(s);
     }
 
+    public static <T> Iterable<T> toIterable(Seq<T> entries) {
+        return JavaConversions$.MODULE$.asJavaIterable(entries);
+    }
 
     public static class Logger {
         final Log log;

Copied: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2JavaHelper.scala (from r1461839, activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2JavaHelper.scala?p2=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2JavaHelper.scala&p1=activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala&r1=1461839&r2=1461905&rev=1461905&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2JavaHelper.scala Wed Mar 27 23:58:58 2013
@@ -14,18 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.mqtt
+package org.apache.activemq.apollo.util
 
-import scala.Function1
 import org.apache.activemq.apollo.util.Log
+import scala.Function1
 import scala.runtime.BoxedUnit
 
+abstract class Fn0[+R] {
+  def apply(): R
+}
+
+class UnitFn0 extends Fn0[BoxedUnit] {
+  def call() = {}
+  def apply() = {
+    call()
+    BoxedUnit.UNIT;
+  }
+}
+
 abstract class Fn1[-T1,+R] {
   def apply(v1: T1): R
 }
 
-class UnitFn1[-T1] extends Fn1[T1, BoxedUnit] {
-  def call(v1: T1) = {}
+abstract class UnitFn1[-T1] extends Fn1[T1, BoxedUnit] {
+  def call(v1: T1)
   def apply(v1: T1) = {
     call(v1)
     BoxedUnit.UNIT;
@@ -51,7 +63,8 @@ class UnitFn2[-T1,-T2] extends Fn2[T1,T2
  * Time: 3:27 PM
  * To change this template use File | Settings | File Templates.
  */
-object ScalaSupportHelper {
+object Scala2JavaHelper {
+  def toScala[R](func:Fn0[R]):Function0[R] = () => { func.apply() }
   def toScala[T1,R](func:Fn1[T1,R]):Function1[T1,R] = (v1:T1) => { func.apply(v1) }
   def toScala[T1,T2,R](func:Fn2[T1,T2,R]):Function2[T1,T2,R] = (v1:T1, v2:T2) => { func.apply(v1,v2) }
   def none[T]:Option[T] = None