You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Curt Jutzi (JIRA)" <ji...@apache.org> on 2014/10/11 00:05:34 UTC

[jira] [Updated] (AMQ-5387) `

     [ https://issues.apache.org/jira/browse/AMQ-5387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Curt Jutzi updated AMQ-5387:
----------------------------
    Description: 
{noformat}
**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.transport.mqtt;

import static org.junit.Assert.*;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.net.util.Base64;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Test;



/**
 * Test the NIO transport with this Test group
 */
public class PahoMQTTNIOTest extends PahoMQTTTest  implements MqttCallback {

    AtomicInteger m_receiveCounter = new AtomicInteger();
    String BigMessage = "................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................;........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................";;
    static ArrayList<MqttClient> mqttClients = null;
    static final Integer staticSyncObj = new Integer(1); 
    String messagePayload = null;
    
    public final int numberOfThreads = 500;
    static ArrayList<pubThreadBitMsg> arrThreads = new ArrayList<pubThreadBitMsg>();
    
    boolean f_messageReceived = false;
    boolean f_ackReceived = false;
    boolean f_lost = false;

    /**
     * 
     * 
     * @param client
     * @param location
     * @param accountId
     * @param userId
     * @param clientId
     * @param token
     * @param nameSpace
     * @param message
     * @param qos
     * @param f_retained
     * @param f_keepOpen
     * @return
     * @throws MqttException
     */
    private MqttClient pubNameSpace(MqttClient client,  
                                    String    location, 
                                    String    accountId, 
                                    String    userId, 
                                    String    clientId,
                                    String    token, 
                                    String    nameSpace, 
                                    String    message, 
                                    int       qos, 
                                    boolean   f_retained,  
                                    boolean   f_keepOpen) throws MqttException
    {

        try
        {        
            boolean f_wasConnected = true; 
            
            if (client == null)
            {
                f_wasConnected = false; 
                client = new MqttClient(location, clientId/*, persistence*/);
            }
            
            if (!f_wasConnected)
            {
                MqttConnectOptions options = new MqttConnectOptions();
                options.setKeepAliveInterval(60);
                options.setConnectionTimeout(120);
                options.setPassword(token.toCharArray());
                options.setUserName(accountId+":"+userId);
                client.connect(options);
                client.setCallback(this);
            }
    
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setPayload(message.getBytes());
            mqttMessage.setQos(qos);
            mqttMessage.setRetained(f_retained);
    
            
            client.publish(nameSpace, mqttMessage);
            
            if (!f_keepOpen)
            {
                client.disconnect();
                client.close();
                client = null; 
            }
            return client; 
        }
        catch (MqttPersistenceException e)
        {
            System.err.println("pubNameSpace : YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY Excpetion  - "+e.getMessage());
            if (client != null)
            {
                client.disconnect(); 
                client.close();
                client = null;
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
            System.err.println("Exception e = "+e.getMessage());
            if (client != null)
            {
                client.disconnect(); 
                client.close();
                client = null;
            }
        }
        return null;
    }
    /**
     * 
     * @param location
     * @param accountId
     * @param userId
     * @param clientId
     * @param token
     * @param nameSpace
     * @param message
     * @param qos
     * @param f_retained
     * @param f_keepOpen
     * @return
     * @throws MqttException
     */
    private MqttClient pubNameSpace(String    location, 
                                    String    accountId, 
                                    String    userId, 
                                    String    clientId,
                                    String    token, 
                                    String    nameSpace, 
                                    String    message, 
                                    int       qos, 
                                    boolean   f_retained,  
                                    boolean   f_keepOpen) throws MqttException
    {

        try
        {        
            MqttClient client = new MqttClient(location, clientId/*, persistence*/);
            client.setCallback(this);
            MqttConnectOptions options = new MqttConnectOptions();
            options.setKeepAliveInterval(60);
            options.setConnectionTimeout(120);
            options.setPassword(token.toCharArray());
            options.setUserName(accountId+":"+userId);
    
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setPayload(message.getBytes());
            mqttMessage.setQos(qos);
            mqttMessage.setRetained(f_retained);
    
            client.connect(options);
            client.publish(nameSpace, mqttMessage);
            
            if (!f_keepOpen)
            {
                client.disconnect();
                client.close();
            }
            return client; 
        }
        catch (MqttPersistenceException e)
        {
            System.err.println("YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY : "+e.getMessage());
            assertTrue(false);
        }
        catch (Exception e)
        {
            e.printStackTrace();
            System.err.println("Exceptoin e = "+e.getMessage()); 
            assertTrue(false);
        }
        return null;
    }
   
    
    /**
     * 
     * @param input
     * @param output
     * @return
     * @throws IOException
     */
    public static long copyLarge(InputStream input, OutputStream output) throws IOException 
    {
      byte[] buffer = new byte[4096];
      long count = 0L;
      int n = 0;
      while (-1 != (n = input.read(buffer))) {
       output.write(buffer, 0, n);
       count += n;
      }
      return count;
    }
    
    /**
     * 
     * @param p
     * @throws IOException
     */
    public static void outputResults (Process p) throws IOException
    {
        BufferedReader stdInput = new BufferedReader(new
                                         InputStreamReader(p.getInputStream()));

        BufferedReader stdError = new BufferedReader(new
                        InputStreamReader(p.getErrorStream()));

        String s; 
        while ((s = stdInput.readLine()) != null) 
        {
            System.out.println(s);
        }
        while ((s = stdError.readLine()) != null) 
        {
            System.out.println(s);
        }
    }
    
    /**
     * HELPER
     * @param msec
     * @throws InterruptedException
     */
    void pause_til_done_or_time(int msec) throws InterruptedException
    {
        int pauseTime = 100; 
        while (!f_messageReceived && msec > 0 && !f_lost)
        {
            Thread.sleep(pauseTime);
            msec -= pauseTime;
        }
        
    }

    static Integer numberOfMessages = 0; 
    public void clearMessageCount()
    {
        numberOfMessages = 0; 
    }
    public Integer getMessageCount()
    {
        return numberOfMessages; 
    }
    /**
     * 
     * @param msec
     */
    private void waitForItAck(int msec)
    {
        while (!f_ackReceived)
        {
            
            try { Thread.sleep(1000); } catch (Exception e){}
            
            msec= msec-1000;
                            
            if (msec < 0)
            {
                break;
            }
        }
    }
    
    @Override
    public String getProtocolScheme() {
        return "mqtt+nio";
    }

    @Override
    public boolean isUseSSL() {
        return false;
    }

    public class PahoCallback implements MqttCallback {

        @Override
        public void connectionLost(Throwable cause) {
            // TODO Auto-generated method stub
            
        }

        @Override
        public void messageArrived(String topic, MqttMessage message)
                throws Exception {
            m_receiveCounter.incrementAndGet();
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            // TODO Auto-generated method stub
            
        }
        
    }
 
    static   MqttClient BalstTestClient = null;
    String   loc = "tcp://localhost:1883";

    public class pubThreadBitMsg extends Thread {
        
        public pubThreadBitMsg()
        {
            synchronized (staticSyncObj)
            {
                try
                {
                    System.out.println("---- pubTheadBitMsg - constructor"); 
                    if (BalstTestClient == null)
                    {
                        BalstTestClient = pubNameSpace(loc, "cjutzi", 
                                        "someone", 
                                         "myclientid_cjutzi_pub", 
                                         "hello",
                                         "/accounts/cjutzi/users/curt/test", 
                                         "Starting Client", 1, false, true);
                        System.out.println("---- pubTheadBitMsg - init"); 
                    }
                } 
                catch (MqttException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }

        public void run()
        {
//            synchronized (staticSyncObj) 
            {
            try
            {
                System.out.println("---- pubTheadBitMsg - send"); 
                pubNameSpace(BalstTestClient, loc, "cjutzi", 
                              "someone", 
                               "myclientid_cjutzi_pub", 
                               "hello",
                               "/accounts/cjutzi/users/curt/test", 
                               BigMessage, 1, false, true);
                } catch (MqttException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
    
    @Test
    public void test_AckOnOldListenerQos1Blast100B10KBlocks() throws MqttException, InterruptedException
    {
        MqttClient subClient = new MqttClient("tcp://localhost:1883",
                "niosubclient",
                new MemoryPersistence());
        MqttConnectOptions cOpts = new MqttConnectOptions();
        cOpts.setCleanSession(true);
        cOpts.setUserName("system:system");
        cOpts.setPassword("system_token".toCharArray());
        
       
        subClient.setCallback(new PahoCallback());
        subClient.connect(cOpts);
        subClient.subscribe("nio/test");
       
        
        for (int i = 0; i < numberOfThreads; i++) 
        {
            arrThreads.add(new pubThreadBitMsg());
        }
        System.out.println("--started"); 
        for (int i = 0; i < numberOfThreads; i++) 
        {
            arrThreads.get(i).start(); 
        }
        System.out.println("-- waiting"); 
        Thread.sleep(10000);
        assertTrue(numberOfThreads == m_receiveCounter.get());
    }
    
    
    /**
     * 
     */
    private void resetFlag()
    {
        f_messageReceived = false; 
        f_ackReceived = false; 
    }
    /***************************************************************/
    /**              CALL BACKS FOR MQTT                           */
    /***************************************************************/
   
    

    
    /**
     * 
     */
      public void connectionLost(Throwable arg0)
      {
          System.out.println("MQTT - Connection Lost");
//          f_terminate = true;
          f_lost = true;
      }

      /**
       * 
       */
      public void deliveryComplete(IMqttDeliveryToken arg0)
      {
          System.out.println("MQTT - delivery complete: Delivery Tokeh = "+arg0.isComplete());
          f_ackReceived = true;
      }
      
      /**
       * 
       */
      public void messageArrived(String arg0, MqttMessage arg1) throws Exception
      {
          synchronized (numberOfMessages)
          {
              numberOfMessages++;   
              System.out.println("MQTT - messageArrived "+arg0+"\nMessage: \t["+arg1+"] QoS: ["+arg1.getQos()+"] isDup ["+arg1.isDuplicate()+"] nameSpace = ");
//            System.out.print(arg1.isDuplicate()?"*":".");
              byte[] payloadBytes = arg1.getPayload(); 
              if (payloadBytes.length >0 )
              {
                  messagePayload = new String(payloadBytes); 
              }
              System.out.println("Message Recieved..."); 
              f_messageReceived = true;
          } 
      }
      
}


{noformat}

  was:
{noformat}
**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.transport.mqtt;

import static org.junit.Assert.*;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.net.util.Base64;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Test;



/**
 * Test the NIO transport with this Test group
 */
public class PahoMQTTNIOTest extends PahoMQTTTest  implements MqttCallback {

    AtomicInteger m_receiveCounter = new AtomicInteger();
    String BigMessage = "................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................;........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................";;
    static ArrayList<MqttClient> mqttClients = null;
    static final Integer staticSyncObj = new Integer(1); 
    String messagePayload = null;
    
    public final int numberOfThreads = 500;
    static ArrayList<pubThreadBitMsg> arrThreads = new ArrayList<pubThreadBitMsg>();
    
    boolean f_messageReceived = false;
    boolean f_ackReceived = false;
    boolean f_lost = false;

    /**
     * 
     * 
     * @param client
     * @param location
     * @param accountId
     * @param userId
     * @param clientId
     * @param token
     * @param nameSpace
     * @param message
     * @param qos
     * @param f_retained
     * @param f_keepOpen
     * @return
     * @throws MqttException
     */
    private MqttClient pubNameSpace(MqttClient client,  
                                    String    location, 
                                    String    accountId, 
                                    String    userId, 
                                    String    clientId,
                                    String    token, 
                                    String    nameSpace, 
                                    String    message, 
                                    int       qos, 
                                    boolean   f_retained,  
                                    boolean   f_keepOpen) throws MqttException
    {

        try
        {        
            boolean f_wasConnected = true; 
            
            if (client == null)
            {
                f_wasConnected = false; 
                client = new MqttClient(location, clientId/*, persistence*/);
            }
            
            if (!f_wasConnected)
            {
                MqttConnectOptions options = new MqttConnectOptions();
                options.setKeepAliveInterval(60);
                options.setConnectionTimeout(120);
                options.setPassword(token.toCharArray());
                options.setUserName(accountId+":"+userId);
                client.connect(options);
                client.setCallback(this);
            }
    
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setPayload(message.getBytes());
            mqttMessage.setQos(qos);
            mqttMessage.setRetained(f_retained);
    
            
            client.publish(nameSpace, mqttMessage);
            
            if (!f_keepOpen)
            {
                client.disconnect();
                client.close();
                client = null; 
            }
            return client; 
        }
        catch (MqttPersistenceException e)
        {
            System.err.println("pubNameSpace : YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY Excpetion  - "+e.getMessage());
            if (client != null)
            {
                client.disconnect(); 
                client.close();
                client = null;
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
            System.err.println("Exception e = "+e.getMessage());
            if (client != null)
            {
                client.disconnect(); 
                client.close();
                client = null;
            }
        }
        return null;
    }
    /**
     * 
     * @param location
     * @param accountId
     * @param userId
     * @param clientId
     * @param token
     * @param nameSpace
     * @param message
     * @param qos
     * @param f_retained
     * @param f_keepOpen
     * @return
     * @throws MqttException
     */
    private MqttClient pubNameSpace(String    location, 
                                    String    accountId, 
                                    String    userId, 
                                    String    clientId,
                                    String    token, 
                                    String    nameSpace, 
                                    String    message, 
                                    int       qos, 
                                    boolean   f_retained,  
                                    boolean   f_keepOpen) throws MqttException
    {

        try
        {        
            MqttClient client = new MqttClient(location, clientId/*, persistence*/);
            client.setCallback(this);
            MqttConnectOptions options = new MqttConnectOptions();
            options.setKeepAliveInterval(60);
            options.setConnectionTimeout(120);
            options.setPassword(token.toCharArray());
            options.setUserName(accountId+":"+userId);
    
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setPayload(message.getBytes());
            mqttMessage.setQos(qos);
            mqttMessage.setRetained(f_retained);
    
            client.connect(options);
            client.publish(nameSpace, mqttMessage);
            
            if (!f_keepOpen)
            {
                client.disconnect();
                client.close();
            }
            return client; 
        }
        catch (MqttPersistenceException e)
        {
            System.err.println("YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY : "+e.getMessage());
            assertTrue(false);
        }
        catch (Exception e)
        {
            e.printStackTrace();
            System.err.println("Exceptoin e = "+e.getMessage()); 
            assertTrue(false);
        }
        return null;
    }
   
    
    /**
     * 
     * @param input
     * @param output
     * @return
     * @throws IOException
     */
    public static long copyLarge(InputStream input, OutputStream output) throws IOException 
    {
      byte[] buffer = new byte[4096];
      long count = 0L;
      int n = 0;
      while (-1 != (n = input.read(buffer))) {
       output.write(buffer, 0, n);
       count += n;
      }
      return count;
    }
    
    /**
     * 
     * @param p
     * @throws IOException
     */
    public static void outputResults (Process p) throws IOException
    {
        BufferedReader stdInput = new BufferedReader(new
                                         InputStreamReader(p.getInputStream()));

        BufferedReader stdError = new BufferedReader(new
                        InputStreamReader(p.getErrorStream()));

        String s; 
        while ((s = stdInput.readLine()) != null) 
        {
            System.out.println(s);
        }
        while ((s = stdError.readLine()) != null) 
        {
            System.out.println(s);
        }
    }
    
    /**
     * HELPER
     * @param msec
     * @throws InterruptedException
     */
    void pause_til_done_or_time(int msec) throws InterruptedException
    {
        int pauseTime = 100; 
        while (!f_messageReceived && msec > 0 && !f_lost)
        {
            Thread.sleep(pauseTime);
            msec -= pauseTime;
        }
        
    }

    static Integer numberOfMessages = 0; 
    public void clearMessageCount()
    {
        numberOfMessages = 0; 
    }
    public Integer getMessageCount()
    {
        return numberOfMessages; 
    }
    /**
     * 
     * @param msec
     */
    private void waitForItAck(int msec)
    {
        while (!f_ackReceived)
        {
            
            try { Thread.sleep(1000); } catch (Exception e){}
            
            msec= msec-1000;
                            
            if (msec < 0)
            {
                break;
            }
        }
    }
    
    @Override
    public String getProtocolScheme() {
        return "mqtt+nio";
    }

    @Override
    public boolean isUseSSL() {
        return false;
    }

    public class PahoCallback implements MqttCallback {

        @Override
        public void connectionLost(Throwable cause) {
            // TODO Auto-generated method stub
            
        }

        @Override
        public void messageArrived(String topic, MqttMessage message)
                throws Exception {
            m_receiveCounter.incrementAndGet();
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            // TODO Auto-generated method stub
            
        }
        
    }
 
    static   MqttClient BalstTestClient = null;
    String   loc = "tcp://localhost:1883";

    public class pubThreadBitMsg extends Thread {
        
        public pubThreadBitMsg()
        {
            synchronized (staticSyncObj)
            {
                try
                {
                    System.out.println("---- pubTheadBitMsg - constructor"); 
                    if (BalstTestClient == null)
                    {
                        BalstTestClient = pubNameSpace(loc, "cjutzi", 
                                        "curt", 
                                         "myclientid_cjutzi_pub", 
                                         "curts_client_pub_token",
                                         "/accounts/cjutzi/users/curt/test", 
                                         "Starting Client", 1, false, true);
                        System.out.println("---- pubTheadBitMsg - init"); 
                    }
                } 
                catch (MqttException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }

        public void run()
        {
//            synchronized (staticSyncObj) 
            {
            try
            {
                System.out.println("---- pubTheadBitMsg - send"); 
                pubNameSpace(BalstTestClient, loc, "cjutzi", 
                              "curt", 
                               "myclientid_cjutzi_pub", 
                               "curts_client_pub_token",
                               "/accounts/cjutzi/users/curt/test", 
                               BigMessage, 1, false, true);
                } catch (MqttException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
    
    @Test
    public void test_AckOnOldListenerQos1Blast100B10KBlocks() throws MqttException, InterruptedException
    {
        MqttClient subClient = new MqttClient("tcp://localhost:1883",
                "niosubclient",
                new MemoryPersistence());
        MqttConnectOptions cOpts = new MqttConnectOptions();
        cOpts.setCleanSession(true);
        cOpts.setUserName("system:system");
        cOpts.setPassword("system_token".toCharArray());
        
       
        subClient.setCallback(new PahoCallback());
        subClient.connect(cOpts);
        subClient.subscribe("nio/test");
       
        
        for (int i = 0; i < numberOfThreads; i++) 
        {
            arrThreads.add(new pubThreadBitMsg());
        }
        System.out.println("--started"); 
        for (int i = 0; i < numberOfThreads; i++) 
        {
            arrThreads.get(i).start(); 
        }
        System.out.println("-- waiting"); 
        Thread.sleep(10000);
        assertTrue(numberOfThreads == m_receiveCounter.get());
    }
    
    
    /**
     * 
     */
    private void resetFlag()
    {
        f_messageReceived = false; 
        f_ackReceived = false; 
    }
    /***************************************************************/
    /**              CALL BACKS FOR MQTT                           */
    /***************************************************************/
   
    

    
    /**
     * 
     */
      public void connectionLost(Throwable arg0)
      {
          System.out.println("MQTT - Connection Lost");
//          f_terminate = true;
          f_lost = true;
      }

      /**
       * 
       */
      public void deliveryComplete(IMqttDeliveryToken arg0)
      {
          System.out.println("MQTT - delivery complete: Delivery Tokeh = "+arg0.isComplete());
          f_ackReceived = true;
      }
      
      /**
       * 
       */
      public void messageArrived(String arg0, MqttMessage arg1) throws Exception
      {
          synchronized (numberOfMessages)
          {
              numberOfMessages++;   
              System.out.println("MQTT - messageArrived "+arg0+"\nMessage: \t["+arg1+"] QoS: ["+arg1.getQos()+"] isDup ["+arg1.isDuplicate()+"] nameSpace = ");
//            System.out.print(arg1.isDuplicate()?"*":".");
              byte[] payloadBytes = arg1.getPayload(); 
              if (payloadBytes.length >0 )
              {
                  messagePayload = new String(payloadBytes); 
              }
              System.out.println("Message Recieved..."); 
              f_messageReceived = true;
          } 
      }
      
}


{noformat}

        Summary: `  (was: NIO Fails (for MQTT at least) on back-to-back Transport frames)

> `
> -
>
>                 Key: AMQ-5387
>                 URL: https://issues.apache.org/jira/browse/AMQ-5387
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: MQTT
>    Affects Versions: 5.11.0
>         Environment: Paho as client 
>            Reporter: Curt Jutzi
>             Fix For: NEEDS_REVIEW
>
>         Attachments: MQTTCode.java.patch
>
>   Original Estimate: 0h
>  Remaining Estimate: 0h
>
> {noformat}
> **
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *      http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.activemq.transport.mqtt;
> import static org.junit.Assert.*;
> import java.io.BufferedReader;
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.InputStreamReader;
> import java.io.OutputStream;
> import java.util.ArrayList;
> import java.util.concurrent.atomic.AtomicInteger;
> import org.apache.commons.net.util.Base64;
> import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
> import org.eclipse.paho.client.mqttv3.MqttCallback;
> import org.eclipse.paho.client.mqttv3.MqttClient;
> import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
> import org.eclipse.paho.client.mqttv3.MqttException;
> import org.eclipse.paho.client.mqttv3.MqttMessage;
> import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
> import org.eclipse.paho.client.mqttv3.MqttSecurityException;
> import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
> import org.junit.Test;
> /**
>  * Test the NIO transport with this Test group
>  */
> public class PahoMQTTNIOTest extends PahoMQTTTest  implements MqttCallback {
>     AtomicInteger m_receiveCounter = new AtomicInteger();
>     String BigMessage = "................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................;........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................";;
>     static ArrayList<MqttClient> mqttClients = null;
>     static final Integer staticSyncObj = new Integer(1); 
>     String messagePayload = null;
>     
>     public final int numberOfThreads = 500;
>     static ArrayList<pubThreadBitMsg> arrThreads = new ArrayList<pubThreadBitMsg>();
>     
>     boolean f_messageReceived = false;
>     boolean f_ackReceived = false;
>     boolean f_lost = false;
>     /**
>      * 
>      * 
>      * @param client
>      * @param location
>      * @param accountId
>      * @param userId
>      * @param clientId
>      * @param token
>      * @param nameSpace
>      * @param message
>      * @param qos
>      * @param f_retained
>      * @param f_keepOpen
>      * @return
>      * @throws MqttException
>      */
>     private MqttClient pubNameSpace(MqttClient client,  
>                                     String    location, 
>                                     String    accountId, 
>                                     String    userId, 
>                                     String    clientId,
>                                     String    token, 
>                                     String    nameSpace, 
>                                     String    message, 
>                                     int       qos, 
>                                     boolean   f_retained,  
>                                     boolean   f_keepOpen) throws MqttException
>     {
>         try
>         {        
>             boolean f_wasConnected = true; 
>             
>             if (client == null)
>             {
>                 f_wasConnected = false; 
>                 client = new MqttClient(location, clientId/*, persistence*/);
>             }
>             
>             if (!f_wasConnected)
>             {
>                 MqttConnectOptions options = new MqttConnectOptions();
>                 options.setKeepAliveInterval(60);
>                 options.setConnectionTimeout(120);
>                 options.setPassword(token.toCharArray());
>                 options.setUserName(accountId+":"+userId);
>                 client.connect(options);
>                 client.setCallback(this);
>             }
>     
>             MqttMessage mqttMessage = new MqttMessage();
>             mqttMessage.setPayload(message.getBytes());
>             mqttMessage.setQos(qos);
>             mqttMessage.setRetained(f_retained);
>     
>             
>             client.publish(nameSpace, mqttMessage);
>             
>             if (!f_keepOpen)
>             {
>                 client.disconnect();
>                 client.close();
>                 client = null; 
>             }
>             return client; 
>         }
>         catch (MqttPersistenceException e)
>         {
>             System.err.println("pubNameSpace : YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY Excpetion  - "+e.getMessage());
>             if (client != null)
>             {
>                 client.disconnect(); 
>                 client.close();
>                 client = null;
>             }
>         }
>         catch (Exception e)
>         {
>             e.printStackTrace();
>             System.err.println("Exception e = "+e.getMessage());
>             if (client != null)
>             {
>                 client.disconnect(); 
>                 client.close();
>                 client = null;
>             }
>         }
>         return null;
>     }
>     /**
>      * 
>      * @param location
>      * @param accountId
>      * @param userId
>      * @param clientId
>      * @param token
>      * @param nameSpace
>      * @param message
>      * @param qos
>      * @param f_retained
>      * @param f_keepOpen
>      * @return
>      * @throws MqttException
>      */
>     private MqttClient pubNameSpace(String    location, 
>                                     String    accountId, 
>                                     String    userId, 
>                                     String    clientId,
>                                     String    token, 
>                                     String    nameSpace, 
>                                     String    message, 
>                                     int       qos, 
>                                     boolean   f_retained,  
>                                     boolean   f_keepOpen) throws MqttException
>     {
>         try
>         {        
>             MqttClient client = new MqttClient(location, clientId/*, persistence*/);
>             client.setCallback(this);
>             MqttConnectOptions options = new MqttConnectOptions();
>             options.setKeepAliveInterval(60);
>             options.setConnectionTimeout(120);
>             options.setPassword(token.toCharArray());
>             options.setUserName(accountId+":"+userId);
>     
>             MqttMessage mqttMessage = new MqttMessage();
>             mqttMessage.setPayload(message.getBytes());
>             mqttMessage.setQos(qos);
>             mqttMessage.setRetained(f_retained);
>     
>             client.connect(options);
>             client.publish(nameSpace, mqttMessage);
>             
>             if (!f_keepOpen)
>             {
>                 client.disconnect();
>                 client.close();
>             }
>             return client; 
>         }
>         catch (MqttPersistenceException e)
>         {
>             System.err.println("YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY : "+e.getMessage());
>             assertTrue(false);
>         }
>         catch (Exception e)
>         {
>             e.printStackTrace();
>             System.err.println("Exceptoin e = "+e.getMessage()); 
>             assertTrue(false);
>         }
>         return null;
>     }
>    
>     
>     /**
>      * 
>      * @param input
>      * @param output
>      * @return
>      * @throws IOException
>      */
>     public static long copyLarge(InputStream input, OutputStream output) throws IOException 
>     {
>       byte[] buffer = new byte[4096];
>       long count = 0L;
>       int n = 0;
>       while (-1 != (n = input.read(buffer))) {
>        output.write(buffer, 0, n);
>        count += n;
>       }
>       return count;
>     }
>     
>     /**
>      * 
>      * @param p
>      * @throws IOException
>      */
>     public static void outputResults (Process p) throws IOException
>     {
>         BufferedReader stdInput = new BufferedReader(new
>                                          InputStreamReader(p.getInputStream()));
>         BufferedReader stdError = new BufferedReader(new
>                         InputStreamReader(p.getErrorStream()));
>         String s; 
>         while ((s = stdInput.readLine()) != null) 
>         {
>             System.out.println(s);
>         }
>         while ((s = stdError.readLine()) != null) 
>         {
>             System.out.println(s);
>         }
>     }
>     
>     /**
>      * HELPER
>      * @param msec
>      * @throws InterruptedException
>      */
>     void pause_til_done_or_time(int msec) throws InterruptedException
>     {
>         int pauseTime = 100; 
>         while (!f_messageReceived && msec > 0 && !f_lost)
>         {
>             Thread.sleep(pauseTime);
>             msec -= pauseTime;
>         }
>         
>     }
>     static Integer numberOfMessages = 0; 
>     public void clearMessageCount()
>     {
>         numberOfMessages = 0; 
>     }
>     public Integer getMessageCount()
>     {
>         return numberOfMessages; 
>     }
>     /**
>      * 
>      * @param msec
>      */
>     private void waitForItAck(int msec)
>     {
>         while (!f_ackReceived)
>         {
>             
>             try { Thread.sleep(1000); } catch (Exception e){}
>             
>             msec= msec-1000;
>                             
>             if (msec < 0)
>             {
>                 break;
>             }
>         }
>     }
>     
>     @Override
>     public String getProtocolScheme() {
>         return "mqtt+nio";
>     }
>     @Override
>     public boolean isUseSSL() {
>         return false;
>     }
>     public class PahoCallback implements MqttCallback {
>         @Override
>         public void connectionLost(Throwable cause) {
>             // TODO Auto-generated method stub
>             
>         }
>         @Override
>         public void messageArrived(String topic, MqttMessage message)
>                 throws Exception {
>             m_receiveCounter.incrementAndGet();
>         }
>         @Override
>         public void deliveryComplete(IMqttDeliveryToken token) {
>             // TODO Auto-generated method stub
>             
>         }
>         
>     }
>  
>     static   MqttClient BalstTestClient = null;
>     String   loc = "tcp://localhost:1883";
>     public class pubThreadBitMsg extends Thread {
>         
>         public pubThreadBitMsg()
>         {
>             synchronized (staticSyncObj)
>             {
>                 try
>                 {
>                     System.out.println("---- pubTheadBitMsg - constructor"); 
>                     if (BalstTestClient == null)
>                     {
>                         BalstTestClient = pubNameSpace(loc, "cjutzi", 
>                                         "someone", 
>                                          "myclientid_cjutzi_pub", 
>                                          "hello",
>                                          "/accounts/cjutzi/users/curt/test", 
>                                          "Starting Client", 1, false, true);
>                         System.out.println("---- pubTheadBitMsg - init"); 
>                     }
>                 } 
>                 catch (MqttException e)
>                 {
>                     // TODO Auto-generated catch block
>                     e.printStackTrace();
>                 }
>             }
>         }
>         public void run()
>         {
> //            synchronized (staticSyncObj) 
>             {
>             try
>             {
>                 System.out.println("---- pubTheadBitMsg - send"); 
>                 pubNameSpace(BalstTestClient, loc, "cjutzi", 
>                               "someone", 
>                                "myclientid_cjutzi_pub", 
>                                "hello",
>                                "/accounts/cjutzi/users/curt/test", 
>                                BigMessage, 1, false, true);
>                 } catch (MqttException e)
>                 {
>                     // TODO Auto-generated catch block
>                     e.printStackTrace();
>                 }
>             }
>         }
>     }
>     
>     @Test
>     public void test_AckOnOldListenerQos1Blast100B10KBlocks() throws MqttException, InterruptedException
>     {
>         MqttClient subClient = new MqttClient("tcp://localhost:1883",
>                 "niosubclient",
>                 new MemoryPersistence());
>         MqttConnectOptions cOpts = new MqttConnectOptions();
>         cOpts.setCleanSession(true);
>         cOpts.setUserName("system:system");
>         cOpts.setPassword("system_token".toCharArray());
>         
>        
>         subClient.setCallback(new PahoCallback());
>         subClient.connect(cOpts);
>         subClient.subscribe("nio/test");
>        
>         
>         for (int i = 0; i < numberOfThreads; i++) 
>         {
>             arrThreads.add(new pubThreadBitMsg());
>         }
>         System.out.println("--started"); 
>         for (int i = 0; i < numberOfThreads; i++) 
>         {
>             arrThreads.get(i).start(); 
>         }
>         System.out.println("-- waiting"); 
>         Thread.sleep(10000);
>         assertTrue(numberOfThreads == m_receiveCounter.get());
>     }
>     
>     
>     /**
>      * 
>      */
>     private void resetFlag()
>     {
>         f_messageReceived = false; 
>         f_ackReceived = false; 
>     }
>     /***************************************************************/
>     /**              CALL BACKS FOR MQTT                           */
>     /***************************************************************/
>    
>     
>     
>     /**
>      * 
>      */
>       public void connectionLost(Throwable arg0)
>       {
>           System.out.println("MQTT - Connection Lost");
> //          f_terminate = true;
>           f_lost = true;
>       }
>       /**
>        * 
>        */
>       public void deliveryComplete(IMqttDeliveryToken arg0)
>       {
>           System.out.println("MQTT - delivery complete: Delivery Tokeh = "+arg0.isComplete());
>           f_ackReceived = true;
>       }
>       
>       /**
>        * 
>        */
>       public void messageArrived(String arg0, MqttMessage arg1) throws Exception
>       {
>           synchronized (numberOfMessages)
>           {
>               numberOfMessages++;   
>               System.out.println("MQTT - messageArrived "+arg0+"\nMessage: \t["+arg1+"] QoS: ["+arg1.getQos()+"] isDup ["+arg1.isDuplicate()+"] nameSpace = ");
> //            System.out.print(arg1.isDuplicate()?"*":".");
>               byte[] payloadBytes = arg1.getPayload(); 
>               if (payloadBytes.length >0 )
>               {
>                   messagePayload = new String(payloadBytes); 
>               }
>               System.out.println("Message Recieved..."); 
>               f_messageReceived = true;
>           } 
>       }
>       
> }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)