You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/03/28 00:58:59 UTC
svn commit: r1461905 [2/2] - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/
apollo-mqtt/src/main/scala/org/apache/activemq/apo...
Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java?rev=1461905&r1=1461904&r2=1461905&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSession.java Wed Mar 27 23:58:58 2013
@@ -24,8 +24,7 @@ import org.apache.activemq.apollo.broker
import org.apache.activemq.apollo.filter.FilterException;
import org.apache.activemq.apollo.mqtt.MqttSessionManager.HostState;
import org.apache.activemq.apollo.mqtt.MqttSessionManager.SessionState;
-import org.apache.activemq.apollo.util.LRUCache;
-import org.apache.activemq.apollo.util.LongCounter;
+import org.apache.activemq.apollo.util.*;
import org.apache.activemq.apollo.util.path.Path$;
import org.apache.activemq.apollo.util.path.PathMap;
import org.apache.activemq.apollo.util.path.PathParser;
@@ -37,7 +36,6 @@ import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.codec.*;
import scala.Option;
import scala.Tuple2;
-import scala.runtime.BoxedUnit;
import java.net.ProtocolException;
import java.util.*;
@@ -45,6 +43,8 @@ import java.util.*;
import static org.fusesource.hawtdispatch.Dispatch.NOOP;
import static org.fusesource.hawtdispatch.Dispatch.createQueue;
+import static org.apache.activemq.apollo.mqtt.MqttProtocolHandler.*;
+
/**
* An MqttSession can be switch from one connection/protocol handler to another,
* but it will only be associated with one at a time. An MqttSession tracks
@@ -54,14 +54,6 @@ import static org.fusesource.hawtdispatc
*/
public class MqttSession {
- public static final ScalaSupport.Logger log = new ScalaSupport.Logger(MqttProtocolHandler$.MODULE$);
-
- public static <T> T received(T value) {
- log.trace("received: %s", value);
- return value;
- }
-
-
public final HostState host_state;
public final UTF8Buffer client_id;
public final SessionState session_state;
@@ -83,7 +75,7 @@ public class MqttSession {
boolean publish_body = false;
public VirtualHost host() {
- return host_state.host();
+ return host_state.host;
}
public void connect(final MqttProtocolHandler next) {
@@ -142,23 +134,23 @@ public class MqttSession {
public void attach() {
queue.assertExecuting();
final MqttProtocolHandler h = handler;
- clean_session = h.connect_message().cleanSession();
- security_context = h.security_context();
- h.command_handler_$eq(ScalaSupport.toScala(new UnitFn1<Object>() {
+ clean_session = h.connect_message.cleanSession();
+ security_context = h.security_context;
+ h.command_handler = new UnitFn1<Object>() {
@Override
public void call(Object v1) {
on_transport_command(v1);
}
- }));
+ };
destination_parser = h.destination_parser();
- mqtt_consumer().consumer_sink.downstream_$eq(ScalaSupport.some(h.sink_manager().open()));
+ mqtt_consumer().consumer_sink.downstream_$eq(Scala2Java.some(h.sink_manager.open()));
final Task ack_connect = new Task() {
@Override
public void run() {
queue.assertExecuting();
- connect_message = h.connect_message();
+ connect_message = h.connect_message;
CONNACK connack = new CONNACK();
connack.code(CONNACK.Code.CONNECTION_ACCEPTED);
send(connack);
@@ -167,10 +159,10 @@ public class MqttSession {
if (!clean_session) {
// Setup the previous subscriptions..
- session_state.strategy().create(host().store(), client_id);
- if (!session_state.subscriptions().isEmpty()) {
+ session_state.strategy.create(host().store(), client_id);
+ if (!session_state.subscriptions.isEmpty()) {
h._suspend_read("subscribing");
- ArrayList<Topic> topics = ScalaSupport.map(session_state.subscriptions().values(), new Fn1<Tuple2<Topic, BindAddress>, Topic>() {
+ ArrayList<Topic> topics = Scala2Java.map(session_state.subscriptions.values(), new Fn1<Tuple2<Topic, BindAddress>, Topic>() {
@Override
public Topic apply(Tuple2<Topic, BindAddress> v1) {
return v1._1();
@@ -189,10 +181,10 @@ public class MqttSession {
} else {
// do we need to clear the received ids?
// durable_session_state.received_message_ids.clear()
- session_state.subscriptions().clear();
- if (session_state.durable_sub() != null) {
- final DestinationAddress[] addresses = new DestinationAddress[]{session_state.durable_sub()};
- session_state.durable_sub_$eq(null);
+ session_state.subscriptions.clear();
+ if (session_state.durable_sub != null) {
+ final DestinationAddress[] addresses = new DestinationAddress[]{session_state.durable_sub};
+ session_state.durable_sub = null;
host().dispatch_queue().execute(new Task() {
@Override
public void run() {
@@ -201,7 +193,7 @@ public class MqttSession {
});
}
- session_state.strategy().destroy(new Task() {
+ session_state.strategy.destroy(new Task() {
@Override
public void run() {
ack_connect.run();
@@ -237,10 +229,10 @@ public class MqttSession {
});
mqtt_consumer().addresses.clear();
}
- session_state.subscriptions().clear();
+ session_state.subscriptions.clear();
} else {
- if (session_state.durable_sub() != null) {
- final BindAddress[] addresses = new BindAddress[]{session_state.durable_sub()};
+ if (session_state.durable_sub != null) {
+ final BindAddress[] addresses = new BindAddress[]{session_state.durable_sub};
host().dispatch_queue().execute(new Runnable() {
@Override
public void run() {
@@ -248,27 +240,27 @@ public class MqttSession {
}
});
mqtt_consumer().addresses.clear();
- session_state.durable_sub_$eq(null);
+ session_state.durable_sub = null;
}
}
for (Request request : in_flight_publishes.values()) {
- if (request.ack() != null) {
- request.ack().apply(
- request.delivered() ? Delivered$.MODULE$ : Undelivered$.MODULE$
+ if (request.ack != null) {
+ request.ack.apply(
+ request.delivered ? Delivered$.MODULE$ : Undelivered$.MODULE$
);
}
}
in_flight_publishes.clear();
- handler.sink_manager().close(mqtt_consumer().consumer_sink.downstream().get(), ScalaSupport.<Request>noopFn1());
- mqtt_consumer().consumer_sink.downstream_$eq(ScalaSupport.<Sink<Request>>none());
+ handler.sink_manager.close(mqtt_consumer().consumer_sink.downstream().get(), Scala2Java.<Request>noopFn1());
+ mqtt_consumer().consumer_sink.downstream_$eq(Scala2Java.<Sink<Request>>none());
handler.on_transport_disconnected();
}
public SimpleAddress decode_destination(UTF8Buffer value) {
- SimpleAddress rc = destination_parser.decode_single_destination(value.toString(), ScalaSupport.toScala(new Fn1<String, SimpleAddress>() {
+ SimpleAddress rc = destination_parser.decode_single_destination(value.toString(), Scala2Java.toScala(new Fn1<String, SimpleAddress>() {
public SimpleAddress apply(String name) {
return new SimpleAddress("topic", destination_parser.decode_path(name));
}
@@ -291,15 +283,15 @@ public class MqttSession {
public void send(MessageSupport.Message message) {
queue.assertExecuting();
- handler.connection_sink().offer(new Request((short) 0, message, null));
+ handler.connection_sink.offer(new Request((short) 0, message, null));
}
public void publish_completed(short id) {
queue.assertExecuting();
Request request = in_flight_publishes.remove(id);
if (request != null) {
- if (request.ack() != null) {
- request.ack().apply(Consumed$.MODULE$);
+ if (request.ack != null) {
+ request.ack.apply(Consumed$.MODULE$);
}
} else {
// It's possible that on a reconnect, we get an ACK
@@ -329,8 +321,8 @@ public class MqttSession {
final PUBREL ack = received(new PUBREL().decode(command));
// TODO: perhaps persist the processed list.. otherwise
// we can't filter out dups after a broker restart.
- session_state.received_message_ids().remove(ack.messageId());
- session_state.strategy().update(new Task() {
+ session_state.received_message_ids.remove(ack.messageId());
+ session_state.strategy.update(new Task() {
@Override
public void run() {
send(new PUBCOMP().messageId(ack.messageId()));
@@ -437,12 +429,12 @@ public class MqttSession {
@Override
public int send_buffer_size() {
- return handler.codec().getReadBufferSize();
+ return handler.codec.getReadBufferSize();
}
@Override
public Option<BrokerConnection> connection() {
- return ScalaSupport.some(handler.connection());
+ return Scala2Java.some(handler.connection());
}
@Override
@@ -453,14 +445,14 @@ public class MqttSession {
public void on_mqtt_publish(final PUBLISH publish) {
- if ((publish.qos() == QoS.EXACTLY_ONCE) && session_state.received_message_ids().contains(publish.messageId())) {
+ if ((publish.qos() == QoS.EXACTLY_ONCE) && session_state.received_message_ids.contains(publish.messageId())) {
PUBREC response = new PUBREC();
response.messageId(publish.messageId());
send(response);
return;
}
- handler.messages_received().incrementAndGet();
+ handler.messages_received.incrementAndGet();
queue.assertExecuting();
MqttProducerRoute route = producerRoutes.get(publish.topicName());
@@ -526,8 +518,8 @@ public class MqttSession {
@Override
public void run() {
// TODO: perhaps persist the processed list..
- session_state.received_message_ids().add(publish.messageId());
- session_state.strategy().update(new Task() {
+ session_state.received_message_ids.add(publish.messageId());
+ session_state.strategy.update(new Task() {
@Override
public void run() {
PUBREC response = new PUBREC();
@@ -555,7 +547,7 @@ public class MqttSession {
delivery.message_$eq(new RawMessage(publish.payload()));
delivery.persistent_$eq(publish.qos().ordinal() > 0);
delivery.size_$eq(publish.payload().length);
- delivery.ack_$eq(ScalaSupport.toScala(ack));
+ delivery.ack_$eq(Scala2Java.toScala(ack));
if (publish.retain()) {
if (delivery.size() == 0) {
delivery.retain_$eq(RetainRemove$.MODULE$);
@@ -599,7 +591,7 @@ public class MqttSession {
@Override
public Option<BrokerConnection> connection() {
- return handler != null ? ScalaSupport.some(handler.connection()) : ScalaSupport.<BrokerConnection>none();
+ return handler != null ? Scala2Java.some(handler.connection()) : Scala2Java.<BrokerConnection>none();
}
@Override
@@ -629,7 +621,7 @@ public class MqttSession {
delivery.retain_$eq(RetainSet$.MODULE$);
}
}
- delivery.ack_$eq(ScalaSupport.toScala(new UnitFn2<DeliveryResult, StoreUOW>() {
+ delivery.ack_$eq(Scala2Java.toScala(new UnitFn2<DeliveryResult, StoreUOW>() {
@Override
public void call(DeliveryResult x, StoreUOW y) {
host().dispatch_queue().execute(new Task() {
@@ -641,7 +633,7 @@ public class MqttSession {
complete_close.run();
}
}));
- handler.messages_received().incrementAndGet();
+ handler.messages_received.incrementAndGet();
prodcuer.offer(delivery);
}
}
@@ -664,7 +656,7 @@ public class MqttSession {
queue.execute(new Task() {
@Override
public void run() {
- session_state.strategy().update(new Task() {
+ session_state.strategy.update(new Task() {
@Override
public void run() {
SUBACK suback = new SUBACK();
@@ -688,11 +680,11 @@ public class MqttSession {
}
public void subscribe(Collection<Topic> topics, final Task on_subscribed) {
- final ArrayList<BindAddress> addresses = ScalaSupport.map(topics, new Fn1<Topic, BindAddress>() {
+ final ArrayList<BindAddress> addresses = Scala2Java.map(topics, new Fn1<Topic, BindAddress>() {
@Override
public BindAddress apply(Topic topic) {
BindAddress address = decode_destination(topic.name());
- session_state.subscriptions().put(topic.name(), new Tuple2<Topic, BindAddress>(topic, address));
+ session_state.subscriptions.put(topic.name(), new Tuple2<Topic, BindAddress>(topic, address));
mqtt_consumer().addresses.put(address, topic.qos());
if (PathParser.containsWildCards(address.path())) {
mqtt_consumer().wildcards.put(address.path(), topic.qos());
@@ -702,12 +694,12 @@ public class MqttSession {
});
- handler.subscription_count_$eq(mqtt_consumer().addresses.size());
+ handler.subscription_count = mqtt_consumer().addresses.size();
if (!clean_session) {
Set<BindAddress> bindAddressSet = mqtt_consumer().addresses.keySet();
SubscriptionAddress durable_sub = new SubscriptionAddress(Path$.MODULE$.create(client_id.toString()), null, bindAddressSet.toArray(new BindAddress[bindAddressSet.size()]));
- session_state.durable_sub_$eq(durable_sub);
+ session_state.durable_sub = durable_sub;
addresses.clear();
addresses.add(durable_sub);
}
@@ -716,7 +708,7 @@ public class MqttSession {
@Override
public void run() {
for (BindAddress address : addresses) {
- host().router().bind(new BindAddress[]{address}, mqtt_consumer(), security_context, ScalaSupport.<Option<String>>noopFn1());
+ host().router().bind(new BindAddress[]{address}, mqtt_consumer(), security_context, Scala2Java.<Option<String>>noopFn1());
}
on_subscribed.run();
}
@@ -725,10 +717,10 @@ public class MqttSession {
public void on_mqtt_unsubscribe(final UNSUBSCRIBE unsubscribe) {
- ArrayList<BindAddress> addressesList = ScalaSupport.flatMap(Arrays.asList(unsubscribe.topics()), new Fn1<UTF8Buffer, Option<BindAddress>>() {
+ ArrayList<BindAddress> addressesList = Scala2Java.flatMap(Arrays.asList(unsubscribe.topics()), new Fn1<UTF8Buffer, Option<BindAddress>>() {
@Override
public Option<BindAddress> apply(UTF8Buffer topicName) {
- Tuple2<Topic, BindAddress> removed = session_state.subscriptions().remove(topicName);
+ Tuple2<Topic, BindAddress> removed = session_state.subscriptions.remove(topicName);
if (removed != null) {
Topic topic = removed._1();
BindAddress address = removed._2();
@@ -736,20 +728,20 @@ public class MqttSession {
if (PathParser.containsWildCards(address.path())) {
mqtt_consumer().wildcards.remove(address.path(), topic.qos());
}
- return ScalaSupport.some(address);
+ return Scala2Java.some(address);
} else {
- return ScalaSupport.none();
+ return Scala2Java.none();
}
}
});
final BindAddress[] addresses = addressesList.toArray(new BindAddress[addressesList.size()]);
- handler.subscription_count_$eq(mqtt_consumer().addresses.size());
+ handler.subscription_count = mqtt_consumer().addresses.size();
if (!clean_session) {
Set<BindAddress> bindAddressSet = mqtt_consumer().addresses.keySet();
- session_state.durable_sub_$eq(new SubscriptionAddress(Path$.MODULE$.create(client_id.toString()), null, bindAddressSet.toArray(new BindAddress[bindAddressSet.size()])));
+ session_state.durable_sub = new SubscriptionAddress(Path$.MODULE$.create(client_id.toString()), null, bindAddressSet.toArray(new BindAddress[bindAddressSet.size()]));
}
host().dispatch_queue().execute(new Task() {
@@ -759,16 +751,16 @@ public class MqttSession {
host().router().unbind(addresses, mqtt_consumer(), false, security_context);
} else {
if (mqtt_consumer().addresses.isEmpty()) {
- host().router().unbind(new BindAddress[]{session_state.durable_sub()}, mqtt_consumer(), true, security_context);
- session_state.durable_sub_$eq(null);
+ host().router().unbind(new BindAddress[]{session_state.durable_sub}, mqtt_consumer(), true, security_context);
+ session_state.durable_sub = null;
} else {
- host().router().bind(new BindAddress[]{session_state.durable_sub()}, mqtt_consumer(), security_context, ScalaSupport.<Option<String>>noopFn1());
+ host().router().bind(new BindAddress[]{session_state.durable_sub}, mqtt_consumer(), security_context, Scala2Java.<Option<String>>noopFn1());
}
}
queue.execute(new Task() {
@Override
public void run() {
- session_state.strategy().update(new Task() {
+ session_state.strategy.update(new Task() {
@Override
public void run() {
UNSUBACK ack = new UNSUBACK();
@@ -847,7 +839,7 @@ public class MqttSession {
MutableSink<Request> consumer_sink = new MutableSink<Request>();
{
- consumer_sink.downstream_$eq(ScalaSupport.<Sink<Request>>none());
+ consumer_sink.downstream_$eq(Scala2Java.<Sink<Request>>none());
}
public LongCounter next_seq_id = new LongCounter(0);
@@ -862,7 +854,7 @@ public class MqttSession {
(value & 0x7FFF)); // the lower 15 bits come for the original seq id.
}
- CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>> credit_window_filter = new CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>>(consumer_sink.flatMap(ScalaSupport.toScala(new Fn1<Tuple2<Session<Delivery>, Delivery>, Option<Request>>() {
+ CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>> credit_window_filter = new CreditWindowFilter<Tuple2<Session<Delivery>, Delivery>>(consumer_sink.flatMap(Scala2Java.toScala(new Fn1<Tuple2<Session<Delivery>, Delivery>, Option<Request>>() {
public Option<Request> apply(Tuple2<Session<Delivery>, Delivery> event) {
queue.assertExecuting();
Session<Delivery> session = event._1();
@@ -875,12 +867,12 @@ public class MqttSession {
QoS qos = addresses.get(topic);
if (qos == null) {
- qos = ScalaSupport.<QoS>head(wildcards.get(topic.path()));
+ qos = Scala2Java.<QoS>head(wildcards.get(topic.path()));
}
if (qos == null) {
acked(delivery, Consumed$.MODULE$);
- return ScalaSupport.none();
+ return Scala2Java.none();
} else {
PUBLISH publish = new PUBLISH();
publish.topicName(new UTF8Buffer(destination_parser.encode_destination(delivery.sender().head())));
@@ -902,7 +894,7 @@ public class MqttSession {
}
}
- handler.messages_sent().incrementAndGet();
+ handler.messages_sent.incrementAndGet();
UnitFn1<DeliveryResult> ack = new UnitFn1<DeliveryResult>() {
@Override
@@ -925,7 +917,7 @@ public class MqttSession {
// A reconnecting client could have acked before
// we get dispatched by the durable sub.
- if (prev.message() == null) {
+ if (prev.message == null) {
in_flight_publishes.remove(id);
acked(delivery, Consumed$.MODULE$);
} else {
@@ -935,18 +927,18 @@ public class MqttSession {
handler.async_die("Client not acking regularly.", null);
}
}
- return ScalaSupport.some(request);
+ return Scala2Java.some(request);
} else {
// This callback gets executed once the message
// sent to the transport.
publish.qos(QoS.AT_MOST_ONCE);
- return ScalaSupport.some(new Request((short) 0, publish, ack));
+ return Scala2Java.some(new Request((short) 0, publish, ack));
}
}
}
- })), SessionDeliverySizer$.MODULE$);
+ })), SessionDeliverySizer.INSTANCE);
public void acked(Delivery delivery, DeliveryResult result) {
@@ -958,7 +950,7 @@ public class MqttSession {
}
{
- credit_window_filter.credit(handler.codec().getWriteBufferSize() * 2, 1);
+ credit_window_filter.credit(handler.codec.getWriteBufferSize() * 2, 1);
}
SessionSinkMux<Delivery> session_manager = new SessionSinkMux<Delivery>(credit_window_filter, queue, Delivery$.MODULE$, Integer.MAX_VALUE / 2, receive_buffer_size()) {
@@ -984,7 +976,7 @@ public class MqttSession {
@Override
public Option<BrokerConnection> connection() {
- return handler != null ? ScalaSupport.some(handler.connection()) : ScalaSupport.<BrokerConnection>none();
+ return handler != null ? Scala2Java.some(handler.connection()) : Scala2Java.<BrokerConnection>none();
}
@Override
@@ -1051,7 +1043,7 @@ public class MqttSession {
}
public void dispose() {
- session_manager.close(downstream(), ScalaSupport.toScala(new UnitFn1<Delivery>() {
+ session_manager.close(downstream(), Scala2Java.toScala(new UnitFn1<Delivery>() {
@Override
public void call(Delivery delivery) {
// We have been closed so we have to nak any deliveries.
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSessionManager.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSessionManager.java?rev=1461905&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSessionManager.java (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttSessionManager.java Wed Mar 27 23:58:58 2013
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt;
+
+import org.apache.activemq.apollo.broker.BindAddress;
+import org.apache.activemq.apollo.broker.SimpleAddress;
+import org.apache.activemq.apollo.broker.SubscriptionAddress;
+import org.apache.activemq.apollo.broker.VirtualHost;
+import org.apache.activemq.apollo.broker.store.Store;
+import org.apache.activemq.apollo.broker.store.StoreUOW;
+import org.apache.activemq.apollo.util.Fn0;
+import org.apache.activemq.apollo.util.Scala2Java;
+import org.apache.activemq.apollo.util.UnitFn0;
+import org.apache.activemq.apollo.util.UnitFn1;
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.hawtbuf.proto.InvalidProtocolBufferException;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import scala.Tuple2;
+import scala.collection.Seq;
+
+import java.util.HashMap;
+import java.util.HashSet;
+
+import static org.fusesource.hawtdispatch.Dispatch.createQueue;
+
+/**
+ * Tracks active sessions so that we can ensure that a given
+ * session id is only associated with once connection
+ * at a time. If a client tries to establish a 2nd
+ * connection, the first one will be closed before the session
+ * is switch to the new connection.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MqttSessionManager {
+
+ public static final Scala2Java.Logger log = MqttProtocolHandler.log;
+
+ static DispatchQueue queue = createQueue("session manager");
+
+ interface StorageStrategy {
+ void update(Task cb);
+ void destroy(Task cb);
+ void create(Store store, UTF8Buffer client_id);
+ }
+
+ static class SessionState {
+ SubscriptionAddress durable_sub = null;
+ java.util.HashMap<UTF8Buffer, Tuple2<Topic, BindAddress>> subscriptions = new java.util.HashMap<UTF8Buffer, Tuple2<Topic, BindAddress>>();
+ HashSet<Short> received_message_ids = new HashSet<Short>();
+ StorageStrategy strategy = new NoopStrategy();
+
+ class NoopStrategy implements StorageStrategy {
+
+ @Override
+ public void create(Store store, UTF8Buffer client_id) {
+ if (store != null) {
+ strategy = new StoreStrategy(store, client_id);
+ }
+ }
+
+ @Override
+ public void update(Task cb) {
+ cb.run();
+ }
+
+ @Override
+ public void destroy(Task cb) {
+ cb.run();
+ }
+ }
+
+ class StoreStrategy implements StorageStrategy {
+
+ public final Store store;
+ public final UTF8Buffer client_id;
+ public final UTF8Buffer session_key;
+
+ public StoreStrategy(Store store, UTF8Buffer client_id) {
+ this.store = store;
+ this.client_id = client_id;
+ this.session_key = new UTF8Buffer("mqtt:" + client_id);
+ }
+
+ @Override
+ public void create(Store store, UTF8Buffer client_id) {
+ }
+
+ @Override
+ public void update(final Task cb) {
+ StoreUOW uow = store.create_uow();
+ SessionPB.Bean session_pb = new SessionPB.Bean();
+ session_pb.setClientId(client_id);
+ for (Short id : received_message_ids) {
+ session_pb.addReceivedMessageIds(id.intValue());
+ }
+ for (Tuple2<Topic, BindAddress> entry : subscriptions.values()) {
+ Topic topic = entry._1();
+ BindAddress address = entry._2();
+ TopicPB.Bean topic_pb = new TopicPB.Bean();
+ topic_pb.setName(topic.name());
+ topic_pb.setQos(topic.qos().ordinal());
+ topic_pb.setAddress(new UTF8Buffer(address.toString()));
+ session_pb.addSubscriptions(topic_pb);
+ }
+ uow.put(session_key, session_pb.freeze().toUnframedBuffer());
+
+ final DispatchQueue current = Dispatch.getCurrentQueue();
+ uow.on_complete(Scala2Java.toScala(new UnitFn0() {
+ @Override
+ public void call() {
+ current.execute(new Task() {
+ @Override
+ public void run() {
+ cb.run();
+ }
+ });
+ }
+ }));
+ uow.release();
+ }
+
+ @Override
+ public void destroy(final Task cb) {
+ StoreUOW uow = store.create_uow();
+ uow.put(session_key, null);
+ final DispatchQueue current = Dispatch.getCurrentQueue();
+ uow.on_complete(Scala2Java.toScala(new UnitFn0() {
+ @Override
+ public void call() {
+ current.execute(new Task() {
+ @Override
+ public void run() {
+ strategy = new NoopStrategy();
+ cb.run();
+ }
+ });
+ }
+ }));
+ uow.release();
+ }
+
+ }
+ }
+
+
+ static public class HostState {
+
+ public final VirtualHost host;
+ public final HashMap<UTF8Buffer, SessionState> session_states = new HashMap<UTF8Buffer, SessionState>();
+ public final HashMap<UTF8Buffer, MqttSession> sessions = new HashMap<UTF8Buffer, MqttSession>();
+ public boolean loaded = false;
+
+ public HostState(VirtualHost host) {
+ this.host = host;
+ }
+
+ public void on_load(final Task func) {
+ if (loaded) {
+ func.run();
+ } else {
+ if (host.store() != null) {
+ // We load all the persisted session's from the host's store when we are first accessed.
+ queue.suspend();
+ host.store().get_prefixed_map_entries(new AsciiBuffer("mqtt:"), Scala2Java.toScala(new UnitFn1<Seq<Tuple2<Buffer, Buffer>>>() {
+ @Override
+ public void call(final Seq<Tuple2<Buffer, Buffer>> entries) {
+ queue.resume();
+ queue.execute(new Task() {
+ @Override
+ public void run() {
+ for (Tuple2<Buffer, Buffer> entry : Scala2Java.toIterable(entries)) {
+ try {
+ Buffer value = entry._2();
+ SessionPB.Buffer session_pb = SessionPB.FACTORY.parseUnframed(value);
+ SessionState session_state = new SessionState();
+ session_state.strategy.create(host.store(), session_pb.getClientId());
+ if (session_pb.hasReceivedMessageIds()) {
+ for (Integer i : session_pb.getReceivedMessageIdsList()) {
+ session_state.received_message_ids.add(i.shortValue());
+ }
+ }
+ if (session_pb.hasSubscriptions()) {
+ for (TopicPB.Getter sub : session_pb.getSubscriptionsList()) {
+ SimpleAddress address = SimpleAddress.apply(sub.getAddress().toString());
+ Topic topic = new Topic(sub.getName(), QoS.values()[sub.getQos()]);
+ session_state.subscriptions.put(sub.getName(), new Tuple2<Topic, BindAddress>(topic, address));
+
+ }
+ }
+ session_states.put(session_pb.getClientId(), session_state);
+ } catch (InvalidProtocolBufferException e) {
+ log.warn(e, "Could not load a stored MQTT session");
+ }
+ }
+ loaded = true;
+ func.run();
+ }
+ });
+ }
+ }));
+
+ } else {
+ loaded = true;
+ func.run();
+ }
+ }
+ }
+ }
+
+ static public void attach(final VirtualHost host, final UTF8Buffer client_id, final MqttProtocolHandler handler) {
+ queue.execute(new Task() {
+ @Override
+ public void run() {
+ final HostState host_state = host.plugin_state(
+ Scala2Java.toScala(new Fn0<HostState>(){
+ @Override
+ public HostState apply() {
+ return new HostState(host);
+ }
+ }),
+ HostState.class);
+ host_state.on_load(new Task() {
+ @Override
+ public void run() {
+ MqttSession assignment = host_state.sessions.get(client_id);
+ if (assignment != null) {
+ assignment.connect(handler);
+ } else {
+ SessionState state;
+ if (handler.connect_message.cleanSession()) {
+ state = host_state.session_states.remove(client_id);
+ if (state == null) {
+ state = new SessionState();
+ }
+ } else {
+ state = host_state.session_states.get(client_id);
+ if (state == null) {
+ state = new SessionState();
+ host_state.session_states.put(client_id, state);
+ }
+ }
+ assignment = new MqttSession(host_state, client_id, state);
+ assignment.connect(handler);
+ host_state.sessions.put(client_id, assignment);
+ }
+ }
+ });
+ }
+ });
+ }
+
+ static public void disconnect(final HostState host_state, final UTF8Buffer client_id, final MqttProtocolHandler handler) {
+ queue.execute(new Task() {
+ @Override
+ public void run() {
+ MqttSession assignment = host_state.sessions.get(client_id);
+ if (assignment != null) {
+ assignment.disconnect(handler);
+ }
+ }
+ });
+ }
+
+ static public void remove(final HostState host_state, final UTF8Buffer client_id) {
+ queue.execute(new Task() {
+ @Override
+ public void run() {
+ host_state.sessions.remove(client_id);
+ }
+ });
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/Request.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/Request.java?rev=1461905&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/Request.java (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/Request.java Wed Mar 27 23:58:58 2013
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt;
+
+import org.apache.activemq.apollo.broker.DeliveryResult;
+import org.apache.activemq.apollo.util.UnitFn1;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.MessageSupport;
+
+/**
+ */
+public class Request {
+
+ public final short id;
+ public final MessageSupport.Message message;
+ public final UnitFn1<DeliveryResult> ack;
+
+ MQTTFrame frame;
+ boolean delivered;
+
+ public Request(short id, MessageSupport.Message message, UnitFn1<DeliveryResult> ack) {
+ this.id = id;
+ this.message = message;
+ this.ack = ack;
+ frame = message==null ? null : message.encode();
+ }
+}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/SessionDeliverySizer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/SessionDeliverySizer.java?rev=1461905&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/SessionDeliverySizer.java (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/SessionDeliverySizer.java Wed Mar 27 23:58:58 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt;
+
+import org.apache.activemq.apollo.broker.Delivery;
+import org.apache.activemq.apollo.broker.Delivery$;
+import org.apache.activemq.apollo.broker.Session;
+import org.apache.activemq.apollo.broker.Sizer;
+import scala.Tuple2;
+
+/**
+ */
+class SessionDeliverySizer implements Sizer<Tuple2<Session<Delivery>, Delivery>> {
+
+ public static final SessionDeliverySizer INSTANCE = new SessionDeliverySizer();
+
+ @Override
+ public int size(Tuple2<Session<Delivery>, Delivery> value) {
+ return Delivery$.MODULE$.size(value._2());
+ }
+}
Copied: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2Java.java (from r1461839, activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2Java.java?p2=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2Java.java&p1=activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java&r1=1461839&r2=1461905&rev=1461905&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2Java.java Wed Mar 27 23:58:58 2013
@@ -14,15 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.mqtt;
+package org.apache.activemq.apollo.util;
-import org.apache.activemq.apollo.util.Log;
-import scala.Function1;
-import scala.Function2;
-import scala.Option;
+import org.apache.activemq.apollo.util.*;
+import org.fusesource.hawtbuf.Buffer;
+import scala.*;
+import scala.collection.JavaConversions$;
+import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.runtime.BoxedUnit;
+import java.lang.Boolean;
+import java.lang.Long;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -30,8 +33,31 @@ import java.util.Iterator;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class ScalaSupport {
- private static ScalaSupportHelper$ helper = ScalaSupportHelper$.MODULE$;
+public class Scala2Java {
+ private static Scala2JavaHelper$ helper = Scala2JavaHelper$.MODULE$;
+
+
+ public static int get(Integer value, int defaultValue) {
+ if( value!=null ) {
+ return value.intValue();
+ } else {
+ return defaultValue;
+ }
+ }
+ public static long get(Long value, long defaultValue) {
+ if( value!=null ) {
+ return value.longValue();
+ } else {
+ return defaultValue;
+ }
+ }
+ public static boolean get(Boolean value, boolean defaultValue) {
+ if( value!=null ) {
+ return value.booleanValue();
+ } else {
+ return defaultValue;
+ }
+ }
static final Function1<Object,BoxedUnit> NOOP_FN1 = helper.toScala(new UnitFn1<Object>() {
@Override
@@ -39,10 +65,20 @@ public class ScalaSupport {
}
});
+ public static String toString(Object o) {
+ return o == null ? null : o.toString();
+ }
+
public static <T1> Function1<T1, BoxedUnit> noopFn1() {
return (Function1<T1, BoxedUnit>) NOOP_FN1;
}
+ public static <R> Function0<R> toScala(Fn0<R> func) {
+ if( func == null ) {
+ return null;
+ }
+ return helper.toScala(func);
+ }
public static <T1, R> Function1<T1, R> toScala(Fn1<T1, R> func) {
if( func == null ) {
@@ -98,6 +134,9 @@ public class ScalaSupport {
return helper.toList(s);
}
+ public static <T> Iterable<T> toIterable(Seq<T> entries) {
+ return JavaConversions$.MODULE$.asJavaIterable(entries);
+ }
public static class Logger {
final Log log;
Copied: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2JavaHelper.scala (from r1461839, activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2JavaHelper.scala?p2=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2JavaHelper.scala&p1=activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala&r1=1461839&r2=1461905&rev=1461905&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/ScalaSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Scala2JavaHelper.scala Wed Mar 27 23:58:58 2013
@@ -14,18 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.mqtt
+package org.apache.activemq.apollo.util
-import scala.Function1
import org.apache.activemq.apollo.util.Log
+import scala.Function1
import scala.runtime.BoxedUnit
+abstract class Fn0[+R] {
+ def apply(): R
+}
+
+class UnitFn0 extends Fn0[BoxedUnit] {
+ def call() = {}
+ def apply() = {
+ call()
+ BoxedUnit.UNIT;
+ }
+}
+
abstract class Fn1[-T1,+R] {
def apply(v1: T1): R
}
-class UnitFn1[-T1] extends Fn1[T1, BoxedUnit] {
- def call(v1: T1) = {}
+abstract class UnitFn1[-T1] extends Fn1[T1, BoxedUnit] {
+ def call(v1: T1)
def apply(v1: T1) = {
call(v1)
BoxedUnit.UNIT;
@@ -51,7 +63,8 @@ class UnitFn2[-T1,-T2] extends Fn2[T1,T2
* Time: 3:27 PM
* To change this template use File | Settings | File Templates.
*/
-object ScalaSupportHelper {
+object Scala2JavaHelper {
+ def toScala[R](func:Fn0[R]):Function0[R] = () => { func.apply() }
def toScala[T1,R](func:Fn1[T1,R]):Function1[T1,R] = (v1:T1) => { func.apply(v1) }
def toScala[T1,T2,R](func:Fn2[T1,T2,R]):Function2[T1,T2,R] = (v1:T1, v2:T2) => { func.apply(v1,v2) }
def none[T]:Option[T] = None