You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2014/10/11 00:18:33 UTC

[jira] [Commented] (AMQ-5387) MQTT Codec - buffer mis-alignment on NIO when Back-2-Back packets are received

    [ https://issues.apache.org/jira/browse/AMQ-5387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14167653#comment-14167653 ] 

Timothy Bish commented on AMQ-5387:
-----------------------------------

It's simpler if you just attach the test case as a file instead of a large comment block.  

> MQTT Codec - buffer mis-alignment on NIO when Back-2-Back packets are received
> ------------------------------------------------------------------------------
>
>                 Key: AMQ-5387
>                 URL: https://issues.apache.org/jira/browse/AMQ-5387
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: MQTT
>    Affects Versions: 5.11.0
>         Environment: Paho as client 
>            Reporter: Curt Jutzi
>             Fix For: NEEDS_REVIEW
>
>         Attachments: MQTTCode.java.patch
>
>   Original Estimate: 0h
>  Remaining Estimate: 0h
>
> {noformat}
> **
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *      http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.activemq.transport.mqtt;
> import static org.junit.Assert.*;
> import java.io.BufferedReader;
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.InputStreamReader;
> import java.io.OutputStream;
> import java.util.ArrayList;
> import java.util.concurrent.atomic.AtomicInteger;
> import org.apache.commons.net.util.Base64;
> import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
> import org.eclipse.paho.client.mqttv3.MqttCallback;
> import org.eclipse.paho.client.mqttv3.MqttClient;
> import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
> import org.eclipse.paho.client.mqttv3.MqttException;
> import org.eclipse.paho.client.mqttv3.MqttMessage;
> import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
> import org.eclipse.paho.client.mqttv3.MqttSecurityException;
> import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
> import org.junit.Test;
> /**
>  * Test the NIO transport with this Test group
>  */
> public class PahoMQTTNIOTest extends PahoMQTTTest  implements MqttCallback {
>     AtomicInteger m_receiveCounter = new AtomicInteger();
>     String BigMessage = "................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................;........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................";;
>     static ArrayList<MqttClient> mqttClients = null;
>     static final Integer staticSyncObj = new Integer(1); 
>     String messagePayload = null;
>     
>     public final int numberOfThreads = 500;
>     static ArrayList<pubThreadBitMsg> arrThreads = new ArrayList<pubThreadBitMsg>();
>     
>     boolean f_messageReceived = false;
>     boolean f_ackReceived = false;
>     boolean f_lost = false;
>     /**
>      * 
>      * 
>      * @param client
>      * @param location
>      * @param accountId
>      * @param userId
>      * @param clientId
>      * @param token
>      * @param nameSpace
>      * @param message
>      * @param qos
>      * @param f_retained
>      * @param f_keepOpen
>      * @return
>      * @throws MqttException
>      */
>     private MqttClient pubNameSpace(MqttClient client,  
>                                     String    location, 
>                                     String    accountId, 
>                                     String    userId, 
>                                     String    clientId,
>                                     String    token, 
>                                     String    nameSpace, 
>                                     String    message, 
>                                     int       qos, 
>                                     boolean   f_retained,  
>                                     boolean   f_keepOpen) throws MqttException
>     {
>         try
>         {        
>             boolean f_wasConnected = true; 
>             
>             if (client == null)
>             {
>                 f_wasConnected = false; 
>                 client = new MqttClient(location, clientId/*, persistence*/);
>             }
>             
>             if (!f_wasConnected)
>             {
>                 MqttConnectOptions options = new MqttConnectOptions();
>                 options.setKeepAliveInterval(60);
>                 options.setConnectionTimeout(120);
>                 options.setPassword(token.toCharArray());
>                 options.setUserName(accountId+":"+userId);
>                 client.connect(options);
>                 client.setCallback(this);
>             }
>     
>             MqttMessage mqttMessage = new MqttMessage();
>             mqttMessage.setPayload(message.getBytes());
>             mqttMessage.setQos(qos);
>             mqttMessage.setRetained(f_retained);
>     
>             
>             client.publish(nameSpace, mqttMessage);
>             
>             if (!f_keepOpen)
>             {
>                 client.disconnect();
>                 client.close();
>                 client = null; 
>             }
>             return client; 
>         }
>         catch (MqttPersistenceException e)
>         {
>             System.err.println("pubNameSpace : YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY Excpetion  - "+e.getMessage());
>             if (client != null)
>             {
>                 client.disconnect(); 
>                 client.close();
>                 client = null;
>             }
>         }
>         catch (Exception e)
>         {
>             e.printStackTrace();
>             System.err.println("Exception e = "+e.getMessage());
>             if (client != null)
>             {
>                 client.disconnect(); 
>                 client.close();
>                 client = null;
>             }
>         }
>         return null;
>     }
>     /**
>      * 
>      * @param location
>      * @param accountId
>      * @param userId
>      * @param clientId
>      * @param token
>      * @param nameSpace
>      * @param message
>      * @param qos
>      * @param f_retained
>      * @param f_keepOpen
>      * @return
>      * @throws MqttException
>      */
>     private MqttClient pubNameSpace(String    location, 
>                                     String    accountId, 
>                                     String    userId, 
>                                     String    clientId,
>                                     String    token, 
>                                     String    nameSpace, 
>                                     String    message, 
>                                     int       qos, 
>                                     boolean   f_retained,  
>                                     boolean   f_keepOpen) throws MqttException
>     {
>         try
>         {        
>             MqttClient client = new MqttClient(location, clientId/*, persistence*/);
>             client.setCallback(this);
>             MqttConnectOptions options = new MqttConnectOptions();
>             options.setKeepAliveInterval(60);
>             options.setConnectionTimeout(120);
>             options.setPassword(token.toCharArray());
>             options.setUserName(accountId+":"+userId);
>     
>             MqttMessage mqttMessage = new MqttMessage();
>             mqttMessage.setPayload(message.getBytes());
>             mqttMessage.setQos(qos);
>             mqttMessage.setRetained(f_retained);
>     
>             client.connect(options);
>             client.publish(nameSpace, mqttMessage);
>             
>             if (!f_keepOpen)
>             {
>                 client.disconnect();
>                 client.close();
>             }
>             return client; 
>         }
>         catch (MqttPersistenceException e)
>         {
>             System.err.println("YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY : "+e.getMessage());
>             assertTrue(false);
>         }
>         catch (Exception e)
>         {
>             e.printStackTrace();
>             System.err.println("Exceptoin e = "+e.getMessage()); 
>             assertTrue(false);
>         }
>         return null;
>     }
>    
>     
>     /**
>      * 
>      * @param input
>      * @param output
>      * @return
>      * @throws IOException
>      */
>     public static long copyLarge(InputStream input, OutputStream output) throws IOException 
>     {
>       byte[] buffer = new byte[4096];
>       long count = 0L;
>       int n = 0;
>       while (-1 != (n = input.read(buffer))) {
>        output.write(buffer, 0, n);
>        count += n;
>       }
>       return count;
>     }
>     
>     /**
>      * 
>      * @param p
>      * @throws IOException
>      */
>     public static void outputResults (Process p) throws IOException
>     {
>         BufferedReader stdInput = new BufferedReader(new
>                                          InputStreamReader(p.getInputStream()));
>         BufferedReader stdError = new BufferedReader(new
>                         InputStreamReader(p.getErrorStream()));
>         String s; 
>         while ((s = stdInput.readLine()) != null) 
>         {
>             System.out.println(s);
>         }
>         while ((s = stdError.readLine()) != null) 
>         {
>             System.out.println(s);
>         }
>     }
>     
>     /**
>      * HELPER
>      * @param msec
>      * @throws InterruptedException
>      */
>     void pause_til_done_or_time(int msec) throws InterruptedException
>     {
>         int pauseTime = 100; 
>         while (!f_messageReceived && msec > 0 && !f_lost)
>         {
>             Thread.sleep(pauseTime);
>             msec -= pauseTime;
>         }
>         
>     }
>     static Integer numberOfMessages = 0; 
>     public void clearMessageCount()
>     {
>         numberOfMessages = 0; 
>     }
>     public Integer getMessageCount()
>     {
>         return numberOfMessages; 
>     }
>     /**
>      * 
>      * @param msec
>      */
>     private void waitForItAck(int msec)
>     {
>         while (!f_ackReceived)
>         {
>             
>             try { Thread.sleep(1000); } catch (Exception e){}
>             
>             msec= msec-1000;
>                             
>             if (msec < 0)
>             {
>                 break;
>             }
>         }
>     }
>     
>     @Override
>     public String getProtocolScheme() {
>         return "mqtt+nio";
>     }
>     @Override
>     public boolean isUseSSL() {
>         return false;
>     }
>     public class PahoCallback implements MqttCallback {
>         @Override
>         public void connectionLost(Throwable cause) {
>             // TODO Auto-generated method stub
>             
>         }
>         @Override
>         public void messageArrived(String topic, MqttMessage message)
>                 throws Exception {
>             m_receiveCounter.incrementAndGet();
>         }
>         @Override
>         public void deliveryComplete(IMqttDeliveryToken token) {
>             // TODO Auto-generated method stub
>             
>         }
>         
>     }
>  
>     static   MqttClient BalstTestClient = null;
>     String   loc = "tcp://localhost:1883";
>     public class pubThreadBitMsg extends Thread {
>         
>         public pubThreadBitMsg()
>         {
>             synchronized (staticSyncObj)
>             {
>                 try
>                 {
>                     System.out.println("---- pubTheadBitMsg - constructor"); 
>                     if (BalstTestClient == null)
>                     {
>                         BalstTestClient = pubNameSpace(loc, "cjutzi", 
>                                         "someone", 
>                                          "myclientid_cjutzi_pub", 
>                                          "hello",
>                                          "/accounts/cjutzi/users/curt/test", 
>                                          "Starting Client", 1, false, true);
>                         System.out.println("---- pubTheadBitMsg - init"); 
>                     }
>                 } 
>                 catch (MqttException e)
>                 {
>                     // TODO Auto-generated catch block
>                     e.printStackTrace();
>                 }
>             }
>         }
>         public void run()
>         {
> //            synchronized (staticSyncObj) 
>             {
>             try
>             {
>                 System.out.println("---- pubTheadBitMsg - send"); 
>                 pubNameSpace(BalstTestClient, loc, "cjutzi", 
>                               "someone", 
>                                "myclientid_cjutzi_pub", 
>                                "hello",
>                                "/accounts/cjutzi/users/curt/test", 
>                                BigMessage, 1, false, true);
>                 } catch (MqttException e)
>                 {
>                     // TODO Auto-generated catch block
>                     e.printStackTrace();
>                 }
>             }
>         }
>     }
>     
>     @Test
>     public void test_AckOnOldListenerQos1Blast100B10KBlocks() throws MqttException, InterruptedException
>     {
>         MqttClient subClient = new MqttClient("tcp://localhost:1883",
>                 "niosubclient",
>                 new MemoryPersistence());
>         MqttConnectOptions cOpts = new MqttConnectOptions();
>         cOpts.setCleanSession(true);
>         cOpts.setUserName("curt");
>         cOpts.setPassword("hello".toCharArray());
>         
>        
>         subClient.setCallback(new PahoCallback());
>         subClient.connect(cOpts);
>         subClient.subscribe("nio/test");
>        
>         
>         for (int i = 0; i < numberOfThreads; i++) 
>         {
>             arrThreads.add(new pubThreadBitMsg());
>         }
>         System.out.println("--started"); 
>         for (int i = 0; i < numberOfThreads; i++) 
>         {
>             arrThreads.get(i).start(); 
>         }
>         System.out.println("-- waiting"); 
>         Thread.sleep(10000);
>         assertTrue(numberOfThreads == m_receiveCounter.get());
>     }
>     
>     
>     /**
>      * 
>      */
>     private void resetFlag()
>     {
>         f_messageReceived = false; 
>         f_ackReceived = false; 
>     }
>     /***************************************************************/
>     /**              CALL BACKS FOR MQTT                           */
>     /***************************************************************/
>    
>     
>     
>     /**
>      * 
>      */
>       public void connectionLost(Throwable arg0)
>       {
>           System.out.println("MQTT - Connection Lost");
> //          f_terminate = true;
>           f_lost = true;
>       }
>       /**
>        * 
>        */
>       public void deliveryComplete(IMqttDeliveryToken arg0)
>       {
>           System.out.println("MQTT - delivery complete: Delivery Tokeh = "+arg0.isComplete());
>           f_ackReceived = true;
>       }
>       
>       /**
>        * 
>        */
>       public void messageArrived(String arg0, MqttMessage arg1) throws Exception
>       {
>           synchronized (numberOfMessages)
>           {
>               numberOfMessages++;   
>               System.out.println("MQTT - messageArrived "+arg0+"\nMessage: \t["+arg1+"] QoS: ["+arg1.getQos()+"] isDup ["+arg1.isDuplicate()+"] nameSpace = ");
> //            System.out.print(arg1.isDuplicate()?"*":".");
>               byte[] payloadBytes = arg1.getPayload(); 
>               if (payloadBytes.length >0 )
>               {
>                   messagePayload = new String(payloadBytes); 
>               }
>               System.out.println("Message Recieved..."); 
>               f_messageReceived = true;
>           } 
>       }
>       
> }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)