You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Curt Jutzi (JIRA)" <ji...@apache.org> on 2014/10/10 23:56:35 UTC
[jira] [Comment Edited] (AMQ-5387) NIO Fails (for MQTT at least) on
back-to-back Transport frames
[ https://issues.apache.org/jira/browse/AMQ-5387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14167526#comment-14167526 ]
Curt Jutzi edited comment on AMQ-5387 at 10/10/14 9:55 PM:
-----------------------------------------------------------
Reseting of the readSize was incorrect. It was using the payLoadRead which could span buffers.. In the case it did.. it would result in a bad state in the codec object, and the following frame would be bad (mis aligned)
was (Author: cjutzi):
reseting of the readSize was incorrect. It was using the payLoadRead which could span buffers.. In the case it did.. it would result in a bad state in the code, and the following frame would be bad (mis aligned)
> NIO Fails (for MQTT at least) on back-to-back Transport frames
> --------------------------------------------------------------
>
> Key: AMQ-5387
> URL: https://issues.apache.org/jira/browse/AMQ-5387
> Project: ActiveMQ
> Issue Type: Bug
> Affects Versions: 5.11.0
> Environment: Paho as client
> Reporter: Curt Jutzi
> Fix For: NEEDS_REVIEW
>
> Attachments: MQTTCode.java.patch
>
> Original Estimate: 0h
> Remaining Estimate: 0h
>
> {noformat}
> **
> * Licensed to the Apache Software Foundation (ASF) under one or more
> * contributor license agreements. See the NOTICE file distributed with
> * this work for additional information regarding copyright ownership.
> * The ASF licenses this file to You under the Apache License, Version 2.0
> * (the "License"); you may not use this file except in compliance with
> * the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> package org.apache.activemq.transport.mqtt;
> import static org.junit.Assert.*;
> import java.io.BufferedReader;
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.InputStreamReader;
> import java.io.OutputStream;
> import java.util.ArrayList;
> import java.util.concurrent.atomic.AtomicInteger;
> import org.apache.commons.net.util.Base64;
> import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
> import org.eclipse.paho.client.mqttv3.MqttCallback;
> import org.eclipse.paho.client.mqttv3.MqttClient;
> import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
> import org.eclipse.paho.client.mqttv3.MqttException;
> import org.eclipse.paho.client.mqttv3.MqttMessage;
> import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
> import org.eclipse.paho.client.mqttv3.MqttSecurityException;
> import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
> import org.junit.Test;
> /**
> * Test the NIO transport with this Test group
> */
> public class PahoMQTTNIOTest extends PahoMQTTTest implements MqttCallback {
> AtomicInteger m_receiveCounter = new AtomicInteger();
> String BigMessage = "................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................;........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................";;
> static ArrayList<MqttClient> mqttClients = null;
> static final Integer staticSyncObj = new Integer(1);
> String messagePayload = null;
>
> public final int numberOfThreads = 500;
> static ArrayList<pubThreadBitMsg> arrThreads = new ArrayList<pubThreadBitMsg>();
>
> boolean f_messageReceived = false;
> boolean f_ackReceived = false;
> boolean f_lost = false;
> /**
> *
> *
> * @param client
> * @param location
> * @param accountId
> * @param userId
> * @param clientId
> * @param token
> * @param nameSpace
> * @param message
> * @param qos
> * @param f_retained
> * @param f_keepOpen
> * @return
> * @throws MqttException
> */
> private MqttClient pubNameSpace(MqttClient client,
> String location,
> String accountId,
> String userId,
> String clientId,
> String token,
> String nameSpace,
> String message,
> int qos,
> boolean f_retained,
> boolean f_keepOpen) throws MqttException
> {
> try
> {
> boolean f_wasConnected = true;
>
> if (client == null)
> {
> f_wasConnected = false;
> client = new MqttClient(location, clientId/*, persistence*/);
> }
>
> if (!f_wasConnected)
> {
> MqttConnectOptions options = new MqttConnectOptions();
> options.setKeepAliveInterval(60);
> options.setConnectionTimeout(120);
> options.setPassword(token.toCharArray());
> options.setUserName(accountId+":"+userId);
> client.connect(options);
> client.setCallback(this);
> }
>
> MqttMessage mqttMessage = new MqttMessage();
> mqttMessage.setPayload(message.getBytes());
> mqttMessage.setQos(qos);
> mqttMessage.setRetained(f_retained);
>
>
> client.publish(nameSpace, mqttMessage);
>
> if (!f_keepOpen)
> {
> client.disconnect();
> client.close();
> client = null;
> }
> return client;
> }
> catch (MqttPersistenceException e)
> {
> System.err.println("pubNameSpace : YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY Excpetion - "+e.getMessage());
> if (client != null)
> {
> client.disconnect();
> client.close();
> client = null;
> }
> }
> catch (Exception e)
> {
> e.printStackTrace();
> System.err.println("Exception e = "+e.getMessage());
> if (client != null)
> {
> client.disconnect();
> client.close();
> client = null;
> }
> }
> return null;
> }
> /**
> *
> * @param location
> * @param accountId
> * @param userId
> * @param clientId
> * @param token
> * @param nameSpace
> * @param message
> * @param qos
> * @param f_retained
> * @param f_keepOpen
> * @return
> * @throws MqttException
> */
> private MqttClient pubNameSpace(String location,
> String accountId,
> String userId,
> String clientId,
> String token,
> String nameSpace,
> String message,
> int qos,
> boolean f_retained,
> boolean f_keepOpen) throws MqttException
> {
> try
> {
> MqttClient client = new MqttClient(location, clientId/*, persistence*/);
> client.setCallback(this);
> MqttConnectOptions options = new MqttConnectOptions();
> options.setKeepAliveInterval(60);
> options.setConnectionTimeout(120);
> options.setPassword(token.toCharArray());
> options.setUserName(accountId+":"+userId);
>
> MqttMessage mqttMessage = new MqttMessage();
> mqttMessage.setPayload(message.getBytes());
> mqttMessage.setQos(qos);
> mqttMessage.setRetained(f_retained);
>
> client.connect(options);
> client.publish(nameSpace, mqttMessage);
>
> if (!f_keepOpen)
> {
> client.disconnect();
> client.close();
> }
> return client;
> }
> catch (MqttPersistenceException e)
> {
> System.err.println("YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY : "+e.getMessage());
> assertTrue(false);
> }
> catch (Exception e)
> {
> e.printStackTrace();
> System.err.println("Exceptoin e = "+e.getMessage());
> assertTrue(false);
> }
> return null;
> }
>
>
> /**
> *
> * @param input
> * @param output
> * @return
> * @throws IOException
> */
> public static long copyLarge(InputStream input, OutputStream output) throws IOException
> {
> byte[] buffer = new byte[4096];
> long count = 0L;
> int n = 0;
> while (-1 != (n = input.read(buffer))) {
> output.write(buffer, 0, n);
> count += n;
> }
> return count;
> }
>
> /**
> *
> * @param p
> * @throws IOException
> */
> public static void outputResults (Process p) throws IOException
> {
> BufferedReader stdInput = new BufferedReader(new
> InputStreamReader(p.getInputStream()));
> BufferedReader stdError = new BufferedReader(new
> InputStreamReader(p.getErrorStream()));
> String s;
> while ((s = stdInput.readLine()) != null)
> {
> System.out.println(s);
> }
> while ((s = stdError.readLine()) != null)
> {
> System.out.println(s);
> }
> }
>
> /**
> * HELPER
> * @param msec
> * @throws InterruptedException
> */
> void pause_til_done_or_time(int msec) throws InterruptedException
> {
> int pauseTime = 100;
> while (!f_messageReceived && msec > 0 && !f_lost)
> {
> Thread.sleep(pauseTime);
> msec -= pauseTime;
> }
>
> }
> static Integer numberOfMessages = 0;
> public void clearMessageCount()
> {
> numberOfMessages = 0;
> }
> public Integer getMessageCount()
> {
> return numberOfMessages;
> }
> /**
> *
> * @param msec
> */
> private void waitForItAck(int msec)
> {
> while (!f_ackReceived)
> {
>
> try { Thread.sleep(1000); } catch (Exception e){}
>
> msec= msec-1000;
>
> if (msec < 0)
> {
> break;
> }
> }
> }
>
> @Override
> public String getProtocolScheme() {
> return "mqtt+nio";
> }
> @Override
> public boolean isUseSSL() {
> return false;
> }
> public class PahoCallback implements MqttCallback {
> @Override
> public void connectionLost(Throwable cause) {
> // TODO Auto-generated method stub
>
> }
> @Override
> public void messageArrived(String topic, MqttMessage message)
> throws Exception {
> m_receiveCounter.incrementAndGet();
> }
> @Override
> public void deliveryComplete(IMqttDeliveryToken token) {
> // TODO Auto-generated method stub
>
> }
>
> }
>
> static MqttClient BalstTestClient = null;
> String loc = "tcp://localhost:1883";
> public class pubThreadBitMsg extends Thread {
>
> public pubThreadBitMsg()
> {
> synchronized (staticSyncObj)
> {
> try
> {
> System.out.println("---- pubTheadBitMsg - constructor");
> if (BalstTestClient == null)
> {
> BalstTestClient = pubNameSpace(loc, "cjutzi",
> "curt",
> "myclientid_cjutzi_pub",
> "curts_client_pub_token",
> "/accounts/cjutzi/users/curt/test",
> "Starting Client", 1, false, true);
> System.out.println("---- pubTheadBitMsg - init");
> }
> }
> catch (MqttException e)
> {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
> }
> public void run()
> {
> // synchronized (staticSyncObj)
> {
> try
> {
> System.out.println("---- pubTheadBitMsg - send");
> pubNameSpace(BalstTestClient, loc, "cjutzi",
> "curt",
> "myclientid_cjutzi_pub",
> "curts_client_pub_token",
> "/accounts/cjutzi/users/curt/test",
> BigMessage, 1, false, true);
> } catch (MqttException e)
> {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
> }
> }
>
> @Test
> public void test_AckOnOldListenerQos1Blast100B10KBlocks() throws MqttException, InterruptedException
> {
> MqttClient subClient = new MqttClient("tcp://localhost:1883",
> "niosubclient",
> new MemoryPersistence());
> MqttConnectOptions cOpts = new MqttConnectOptions();
> cOpts.setCleanSession(true);
> cOpts.setUserName("system:system");
> cOpts.setPassword("system_token".toCharArray());
>
>
> subClient.setCallback(new PahoCallback());
> subClient.connect(cOpts);
> subClient.subscribe("nio/test");
>
>
> for (int i = 0; i < numberOfThreads; i++)
> {
> arrThreads.add(new pubThreadBitMsg());
> }
> System.out.println("--started");
> for (int i = 0; i < numberOfThreads; i++)
> {
> arrThreads.get(i).start();
> }
> System.out.println("-- waiting");
> Thread.sleep(10000);
> assertTrue(numberOfThreads == m_receiveCounter.get());
> }
>
>
> /**
> *
> */
> private void resetFlag()
> {
> f_messageReceived = false;
> f_ackReceived = false;
> }
> /***************************************************************/
> /** CALL BACKS FOR MQTT */
> /***************************************************************/
>
>
>
> /**
> *
> */
> public void connectionLost(Throwable arg0)
> {
> System.out.println("MQTT - Connection Lost");
> // f_terminate = true;
> f_lost = true;
> }
> /**
> *
> */
> public void deliveryComplete(IMqttDeliveryToken arg0)
> {
> System.out.println("MQTT - delivery complete: Delivery Tokeh = "+arg0.isComplete());
> f_ackReceived = true;
> }
>
> /**
> *
> */
> public void messageArrived(String arg0, MqttMessage arg1) throws Exception
> {
> synchronized (numberOfMessages)
> {
> numberOfMessages++;
> System.out.println("MQTT - messageArrived "+arg0+"\nMessage: \t["+arg1+"] QoS: ["+arg1.getQos()+"] isDup ["+arg1.isDuplicate()+"] nameSpace = ");
> // System.out.print(arg1.isDuplicate()?"*":".");
> byte[] payloadBytes = arg1.getPayload();
> if (payloadBytes.length >0 )
> {
> messagePayload = new String(payloadBytes);
> }
> System.out.println("Message Recieved...");
> f_messageReceived = true;
> }
> }
>
> }
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)