You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/19 23:41:39 UTC
svn commit: r1400304 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt:
MQTTInactivityMonitor.java MQTTProtocolConverter.java MQTTWireFormat.java
Author: tabish
Date: Fri Oct 19 21:41:39 2012
New Revision: 1400304
URL: http://svn.apache.org/viewvc?rev=1400304&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4117
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java?rev=1400304&r1=1400303&r2=1400304&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java Fri Oct 19 21:41:39 2012
@@ -25,7 +25,7 @@ import java.util.concurrent.ThreadPoolEx
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.thread.SchedulerTimerTask;
@@ -42,22 +42,19 @@ public class MQTTInactivityMonitor exten
private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class);
+ private static final long DEFAULT_CHECK_TIME_MILLS = 30000;
+
private static ThreadPoolExecutor ASYNC_TASKS;
private static int CHECKER_COUNTER;
- private static long DEFAULT_CHECK_TIME_MILLS = 30000;
private static Timer READ_CHECK_TIMER;
private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
-
- private final AtomicBoolean commandSent = new AtomicBoolean(false);
- private final AtomicBoolean inSend = new AtomicBoolean(false);
private final AtomicBoolean failed = new AtomicBoolean(false);
-
private final AtomicBoolean commandReceived = new AtomicBoolean(true);
private final AtomicBoolean inReceive = new AtomicBoolean(false);
private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
- private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
+ private final ReentrantLock sendLock = new ReentrantLock();
private SchedulerTimerTask readCheckerTask;
private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
@@ -65,7 +62,6 @@ public class MQTTInactivityMonitor exten
private boolean keepAliveResponseRequired;
private MQTTProtocolConverter protocolConverter;
-
private final Runnable readChecker = new Runnable() {
long lastRunTime;
@@ -96,7 +92,6 @@ public class MQTTInactivityMonitor exten
return elapsed > (readCheckTime * 9 / 10);
}
-
public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
super(next);
}
@@ -111,7 +106,6 @@ public class MQTTInactivityMonitor exten
next.stop();
}
-
final void readCheck() {
int currentCounter = next.getReceiveCounter();
int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
@@ -132,8 +126,6 @@ public class MQTTInactivityMonitor exten
}
onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
}
-
- ;
});
} else {
if (LOG.isTraceEnabled()) {
@@ -143,7 +135,6 @@ public class MQTTInactivityMonitor exten
commandReceived.set(false);
}
-
public void onCommand(Object command) {
commandReceived.set(true);
inReceive.set(true);
@@ -151,14 +142,14 @@ public class MQTTInactivityMonitor exten
if (command.getClass() == KeepAliveInfo.class) {
KeepAliveInfo info = (KeepAliveInfo) command;
if (info.isResponseRequired()) {
- sendLock.readLock().lock();
+ sendLock.lock();
try {
info.setResponseRequired(false);
oneway(info);
} catch (IOException e) {
onException(e);
} finally {
- sendLock.readLock().unlock();
+ sendLock.unlock();
}
}
} else {
@@ -171,17 +162,12 @@ public class MQTTInactivityMonitor exten
public void oneway(Object o) throws IOException {
// To prevent the inactivity monitor from sending a message while we
- // are performing a send we take a read lock. The inactivity monitor
- // sends its Heart-beat commands under a write lock. This means that
- // the MutexTransport is still responsible for synchronizing sends
- this.sendLock.readLock().lock();
- inSend.set(true);
+ // are performing a send we take the lock.
+ this.sendLock.lock();
try {
doOnewaySend(o);
} finally {
- commandSent.set(true);
- inSend.set(false);
- this.sendLock.readLock().unlock();
+ this.sendLock.unlock();
}
}
@@ -200,7 +186,6 @@ public class MQTTInactivityMonitor exten
}
}
-
public long getReadCheckTime() {
return readCheckTime;
}
@@ -209,7 +194,6 @@ public class MQTTInactivityMonitor exten
this.readCheckTime = readCheckTime;
}
-
public long getInitialDelayTime() {
return initialDelayTime;
}
@@ -239,16 +223,20 @@ public class MQTTInactivityMonitor exten
}
synchronized void startMonitorThread() {
- if (monitorStarted.get()) {
+
+ // Not yet configured if this isn't set yet.
+ if (protocolConverter == null) {
return;
}
+ if (monitorStarted.get()) {
+ return;
+ }
if (readCheckTime > 0) {
readCheckerTask = new SchedulerTimerTask(readChecker);
}
-
if (readCheckTime > 0) {
monitorStarted.set(true);
synchronized (AbstractInactivityMonitor.class) {
@@ -264,7 +252,6 @@ public class MQTTInactivityMonitor exten
}
}
-
synchronized void stopMonitorThread() {
if (monitorStarted.compareAndSet(true, false)) {
if (readCheckerTask != null) {
@@ -293,9 +280,8 @@ public class MQTTInactivityMonitor exten
};
private ThreadPoolExecutor createExecutor() {
- ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
+ ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
exec.allowCoreThreadTimeOut(true);
return exec;
}
}
-
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1400304&r1=1400303&r2=1400304&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java Fri Oct 19 21:41:39 2012
@@ -26,8 +26,30 @@ import java.util.zip.Inflater;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
+
import org.apache.activemq.broker.BrokerContext;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
@@ -38,7 +60,21 @@ import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
-import org.fusesource.mqtt.codec.*;
+import org.fusesource.mqtt.codec.CONNACK;
+import org.fusesource.mqtt.codec.CONNECT;
+import org.fusesource.mqtt.codec.DISCONNECT;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.PINGREQ;
+import org.fusesource.mqtt.codec.PINGRESP;
+import org.fusesource.mqtt.codec.PUBACK;
+import org.fusesource.mqtt.codec.PUBCOMP;
+import org.fusesource.mqtt.codec.PUBLISH;
+import org.fusesource.mqtt.codec.PUBREC;
+import org.fusesource.mqtt.codec.PUBREL;
+import org.fusesource.mqtt.codec.SUBACK;
+import org.fusesource.mqtt.codec.SUBSCRIBE;
+import org.fusesource.mqtt.codec.UNSUBACK;
+import org.fusesource.mqtt.codec.UNSUBSCRIBE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +85,6 @@ class MQTTProtocolConverter {
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
-
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private final SessionId sessionId = new SessionId(connectionId, -1);
private final ProducerId producerId = new ProducerId(sessionId, 1);
@@ -83,7 +118,6 @@ class MQTTProtocolConverter {
}
}
-
void sendToActiveMQ(Command command, ResponseHandler handler) {
command.setCommandId(generateCommandId());
if (handler != null) {
@@ -101,13 +135,11 @@ class MQTTProtocolConverter {
}
}
-
/**
* Convert a MQTT command
*/
public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException {
-
switch (frame.messageType()) {
case PINGREQ.TYPE: {
mqttTransport.sendToMQTT(PING_RESP_FRAME);
@@ -152,13 +184,12 @@ class MQTTProtocolConverter {
onMQTTPubComp(new PUBCOMP().decode(frame));
break;
}
- default:
+ default: {
handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
+ }
}
-
}
-
void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
if (connected.get()) {
@@ -182,7 +213,6 @@ class MQTTProtocolConverter {
configureInactivityMonitor(connect.keepAlive());
-
connectionInfo.setConnectionId(connectionId);
if (clientId != null && !clientId.isEmpty()) {
connectionInfo.setClientId(clientId);
@@ -232,14 +262,12 @@ class MQTTProtocolConverter {
}
});
-
}
});
}
void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException {
checkConnected();
- SUBACK result = new SUBACK();
Topic[] topics = command.topics();
if (topics != null) {
byte[] qos = new byte[topics.length];
@@ -261,9 +289,6 @@ class MQTTProtocolConverter {
QoS onSubscribe(SUBSCRIBE command, Topic topic) throws MQTTProtocolException {
ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
- if (destination == null) {
- throw new MQTTProtocolException("Invalid Destination.");
- }
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id);
@@ -277,7 +302,6 @@ class MQTTProtocolConverter {
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
-
subscriptionsByConsumerId.put(id, mqttSubscription);
mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
@@ -295,7 +319,6 @@ class MQTTProtocolConverter {
UNSUBACK ack = new UNSUBACK();
ack.messageId(command.messageId());
sendToMQTT(ack.encode());
-
}
void onUnSubscribe(UTF8Buffer topicName) {
@@ -310,12 +333,9 @@ class MQTTProtocolConverter {
}
}
-
/**
* Dispatch a ActiveMQ command
*/
-
-
public void onActiveMQCommand(Command command) throws Exception {
if (command.isResponse()) {
Response response = (Response) command;
@@ -406,7 +426,6 @@ class MQTTProtocolConverter {
}
}
-
ActiveMQMessage convertMessage(PUBLISH command) throws JMSException {
ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
@@ -456,43 +475,37 @@ class MQTTProtocolConverter {
}
result.topicName(topicName);
-
if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
- ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
- msg.setReadOnlyBody(true);
- String messageText = msg.getText();
- if (messageText != null) {
- result.payload(new Buffer(messageText.getBytes("UTF-8")));
- }
-
-
+ ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
+ msg.setReadOnlyBody(true);
+ String messageText = msg.getText();
+ if (messageText != null) {
+ result.payload(new Buffer(messageText.getBytes("UTF-8")));
+ }
} else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
-
ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
msg.setReadOnlyBody(true);
byte[] data = new byte[(int) msg.getBodyLength()];
msg.readBytes(data);
result.payload(new Buffer(data));
- } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE){
+ } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
msg.setReadOnlyBody(true);
- Map map = msg.getContentMap();
- if (map != null){
+ Map<String, Object> map = msg.getContentMap();
+ if (map != null) {
result.payload(new Buffer(map.toString().getBytes("UTF-8")));
}
- }
-
- else {
+ } else {
ByteSequence byteSequence = message.getContent();
if (byteSequence != null && byteSequence.getLength() > 0) {
- if (message.isCompressed()){
+ if (message.isCompressed()) {
Inflater inflater = new Inflater();
- inflater.setInput(byteSequence.data,byteSequence.offset,byteSequence.length);
- byte[] data = new byte[4096];
+ inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
+ byte[] data = new byte[4096];
int read;
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
- while((read = inflater.inflate(data)) != 0){
- bytesOut.write(data,0,read);
+ while ((read = inflater.inflate(data)) != 0) {
+ bytesOut.write(data, 0, read);
}
byteSequence = bytesOut.toByteSequence();
}
@@ -502,7 +515,6 @@ class MQTTProtocolConverter {
return result;
}
-
public MQTTTransport getMQTTTransport() {
return mqttTransport;
}
@@ -522,22 +534,18 @@ class MQTTProtocolConverter {
} catch (Exception e) {
LOG.warn("Failed to publish Will Message " + connect.willMessage());
}
-
}
}
}
-
void configureInactivityMonitor(short heartBeat) {
try {
-
int heartBeatMS = heartBeat * 1000;
MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
monitor.setProtocolConverter(this);
monitor.setReadCheckTime(heartBeatMS);
monitor.setInitialDelayTime(heartBeatMS);
monitor.startMonitorThread();
-
} catch (Exception ex) {
LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
}
@@ -545,7 +553,6 @@ class MQTTProtocolConverter {
LOG.debug(getClientId() + " MQTT Connection using heart beat of " + heartBeat + " secs");
}
-
void handleException(Throwable exception, MQTTFrame command) {
LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
if (LOG.isDebugEnabled()) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java?rev=1400304&r1=1400303&r2=1400304&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mqtt/MQTTWireFormat.java Fri Oct 19 21:41:39 2012
@@ -35,10 +35,8 @@ import org.fusesource.mqtt.codec.MQTTFra
*/
public class MQTTWireFormat implements WireFormat {
-
static final int MAX_MESSAGE_LENGTH = 1024 * 1024 * 256;
- private boolean encodingEnabled = false;
private int version = 1;
public ByteSequence marshal(Object command) throws IOException {
@@ -119,6 +117,4 @@ public class MQTTWireFormat implements W
public int getVersion() {
return this.version;
}
-
-
}