You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Curt Jutzi (JIRA)" <ji...@apache.org> on 2014/10/11 00:05:34 UTC
[jira] [Updated] (AMQ-5387) `
[ https://issues.apache.org/jira/browse/AMQ-5387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Curt Jutzi updated AMQ-5387:
----------------------------
Description:
{noformat}
**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.net.util.Base64;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Test;
/**
* Test the NIO transport with this Test group
*/
public class PahoMQTTNIOTest extends PahoMQTTTest implements MqttCallback {
AtomicInteger m_receiveCounter = new AtomicInteger();
String BigMessage = "................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................;........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................";;
static ArrayList<MqttClient> mqttClients = null;
static final Integer staticSyncObj = new Integer(1);
String messagePayload = null;
public final int numberOfThreads = 500;
static ArrayList<pubThreadBitMsg> arrThreads = new ArrayList<pubThreadBitMsg>();
boolean f_messageReceived = false;
boolean f_ackReceived = false;
boolean f_lost = false;
/**
*
*
* @param client
* @param location
* @param accountId
* @param userId
* @param clientId
* @param token
* @param nameSpace
* @param message
* @param qos
* @param f_retained
* @param f_keepOpen
* @return
* @throws MqttException
*/
private MqttClient pubNameSpace(MqttClient client,
String location,
String accountId,
String userId,
String clientId,
String token,
String nameSpace,
String message,
int qos,
boolean f_retained,
boolean f_keepOpen) throws MqttException
{
try
{
boolean f_wasConnected = true;
if (client == null)
{
f_wasConnected = false;
client = new MqttClient(location, clientId/*, persistence*/);
}
if (!f_wasConnected)
{
MqttConnectOptions options = new MqttConnectOptions();
options.setKeepAliveInterval(60);
options.setConnectionTimeout(120);
options.setPassword(token.toCharArray());
options.setUserName(accountId+":"+userId);
client.connect(options);
client.setCallback(this);
}
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(message.getBytes());
mqttMessage.setQos(qos);
mqttMessage.setRetained(f_retained);
client.publish(nameSpace, mqttMessage);
if (!f_keepOpen)
{
client.disconnect();
client.close();
client = null;
}
return client;
}
catch (MqttPersistenceException e)
{
System.err.println("pubNameSpace : YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY Excpetion - "+e.getMessage());
if (client != null)
{
client.disconnect();
client.close();
client = null;
}
}
catch (Exception e)
{
e.printStackTrace();
System.err.println("Exception e = "+e.getMessage());
if (client != null)
{
client.disconnect();
client.close();
client = null;
}
}
return null;
}
/**
*
* @param location
* @param accountId
* @param userId
* @param clientId
* @param token
* @param nameSpace
* @param message
* @param qos
* @param f_retained
* @param f_keepOpen
* @return
* @throws MqttException
*/
private MqttClient pubNameSpace(String location,
String accountId,
String userId,
String clientId,
String token,
String nameSpace,
String message,
int qos,
boolean f_retained,
boolean f_keepOpen) throws MqttException
{
try
{
MqttClient client = new MqttClient(location, clientId/*, persistence*/);
client.setCallback(this);
MqttConnectOptions options = new MqttConnectOptions();
options.setKeepAliveInterval(60);
options.setConnectionTimeout(120);
options.setPassword(token.toCharArray());
options.setUserName(accountId+":"+userId);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(message.getBytes());
mqttMessage.setQos(qos);
mqttMessage.setRetained(f_retained);
client.connect(options);
client.publish(nameSpace, mqttMessage);
if (!f_keepOpen)
{
client.disconnect();
client.close();
}
return client;
}
catch (MqttPersistenceException e)
{
System.err.println("YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY : "+e.getMessage());
assertTrue(false);
}
catch (Exception e)
{
e.printStackTrace();
System.err.println("Exceptoin e = "+e.getMessage());
assertTrue(false);
}
return null;
}
/**
*
* @param input
* @param output
* @return
* @throws IOException
*/
public static long copyLarge(InputStream input, OutputStream output) throws IOException
{
byte[] buffer = new byte[4096];
long count = 0L;
int n = 0;
while (-1 != (n = input.read(buffer))) {
output.write(buffer, 0, n);
count += n;
}
return count;
}
/**
*
* @param p
* @throws IOException
*/
public static void outputResults (Process p) throws IOException
{
BufferedReader stdInput = new BufferedReader(new
InputStreamReader(p.getInputStream()));
BufferedReader stdError = new BufferedReader(new
InputStreamReader(p.getErrorStream()));
String s;
while ((s = stdInput.readLine()) != null)
{
System.out.println(s);
}
while ((s = stdError.readLine()) != null)
{
System.out.println(s);
}
}
/**
* HELPER
* @param msec
* @throws InterruptedException
*/
void pause_til_done_or_time(int msec) throws InterruptedException
{
int pauseTime = 100;
while (!f_messageReceived && msec > 0 && !f_lost)
{
Thread.sleep(pauseTime);
msec -= pauseTime;
}
}
static Integer numberOfMessages = 0;
public void clearMessageCount()
{
numberOfMessages = 0;
}
public Integer getMessageCount()
{
return numberOfMessages;
}
/**
*
* @param msec
*/
private void waitForItAck(int msec)
{
while (!f_ackReceived)
{
try { Thread.sleep(1000); } catch (Exception e){}
msec= msec-1000;
if (msec < 0)
{
break;
}
}
}
@Override
public String getProtocolScheme() {
return "mqtt+nio";
}
@Override
public boolean isUseSSL() {
return false;
}
public class PahoCallback implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String topic, MqttMessage message)
throws Exception {
m_receiveCounter.incrementAndGet();
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub
}
}
static MqttClient BalstTestClient = null;
String loc = "tcp://localhost:1883";
public class pubThreadBitMsg extends Thread {
public pubThreadBitMsg()
{
synchronized (staticSyncObj)
{
try
{
System.out.println("---- pubTheadBitMsg - constructor");
if (BalstTestClient == null)
{
BalstTestClient = pubNameSpace(loc, "cjutzi",
"someone",
"myclientid_cjutzi_pub",
"hello",
"/accounts/cjutzi/users/curt/test",
"Starting Client", 1, false, true);
System.out.println("---- pubTheadBitMsg - init");
}
}
catch (MqttException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public void run()
{
// synchronized (staticSyncObj)
{
try
{
System.out.println("---- pubTheadBitMsg - send");
pubNameSpace(BalstTestClient, loc, "cjutzi",
"someone",
"myclientid_cjutzi_pub",
"hello",
"/accounts/cjutzi/users/curt/test",
BigMessage, 1, false, true);
} catch (MqttException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
@Test
public void test_AckOnOldListenerQos1Blast100B10KBlocks() throws MqttException, InterruptedException
{
MqttClient subClient = new MqttClient("tcp://localhost:1883",
"niosubclient",
new MemoryPersistence());
MqttConnectOptions cOpts = new MqttConnectOptions();
cOpts.setCleanSession(true);
cOpts.setUserName("system:system");
cOpts.setPassword("system_token".toCharArray());
subClient.setCallback(new PahoCallback());
subClient.connect(cOpts);
subClient.subscribe("nio/test");
for (int i = 0; i < numberOfThreads; i++)
{
arrThreads.add(new pubThreadBitMsg());
}
System.out.println("--started");
for (int i = 0; i < numberOfThreads; i++)
{
arrThreads.get(i).start();
}
System.out.println("-- waiting");
Thread.sleep(10000);
assertTrue(numberOfThreads == m_receiveCounter.get());
}
/**
*
*/
private void resetFlag()
{
f_messageReceived = false;
f_ackReceived = false;
}
/***************************************************************/
/** CALL BACKS FOR MQTT */
/***************************************************************/
/**
*
*/
public void connectionLost(Throwable arg0)
{
System.out.println("MQTT - Connection Lost");
// f_terminate = true;
f_lost = true;
}
/**
*
*/
public void deliveryComplete(IMqttDeliveryToken arg0)
{
System.out.println("MQTT - delivery complete: Delivery Tokeh = "+arg0.isComplete());
f_ackReceived = true;
}
/**
*
*/
public void messageArrived(String arg0, MqttMessage arg1) throws Exception
{
synchronized (numberOfMessages)
{
numberOfMessages++;
System.out.println("MQTT - messageArrived "+arg0+"\nMessage: \t["+arg1+"] QoS: ["+arg1.getQos()+"] isDup ["+arg1.isDuplicate()+"] nameSpace = ");
// System.out.print(arg1.isDuplicate()?"*":".");
byte[] payloadBytes = arg1.getPayload();
if (payloadBytes.length >0 )
{
messagePayload = new String(payloadBytes);
}
System.out.println("Message Recieved...");
f_messageReceived = true;
}
}
}
{noformat}
was:
{noformat}
**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.net.util.Base64;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Test;
/**
* Test the NIO transport with this Test group
*/
public class PahoMQTTNIOTest extends PahoMQTTTest implements MqttCallback {
AtomicInteger m_receiveCounter = new AtomicInteger();
String BigMessage = "................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................;........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................";;
static ArrayList<MqttClient> mqttClients = null;
static final Integer staticSyncObj = new Integer(1);
String messagePayload = null;
public final int numberOfThreads = 500;
static ArrayList<pubThreadBitMsg> arrThreads = new ArrayList<pubThreadBitMsg>();
boolean f_messageReceived = false;
boolean f_ackReceived = false;
boolean f_lost = false;
/**
*
*
* @param client
* @param location
* @param accountId
* @param userId
* @param clientId
* @param token
* @param nameSpace
* @param message
* @param qos
* @param f_retained
* @param f_keepOpen
* @return
* @throws MqttException
*/
private MqttClient pubNameSpace(MqttClient client,
String location,
String accountId,
String userId,
String clientId,
String token,
String nameSpace,
String message,
int qos,
boolean f_retained,
boolean f_keepOpen) throws MqttException
{
try
{
boolean f_wasConnected = true;
if (client == null)
{
f_wasConnected = false;
client = new MqttClient(location, clientId/*, persistence*/);
}
if (!f_wasConnected)
{
MqttConnectOptions options = new MqttConnectOptions();
options.setKeepAliveInterval(60);
options.setConnectionTimeout(120);
options.setPassword(token.toCharArray());
options.setUserName(accountId+":"+userId);
client.connect(options);
client.setCallback(this);
}
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(message.getBytes());
mqttMessage.setQos(qos);
mqttMessage.setRetained(f_retained);
client.publish(nameSpace, mqttMessage);
if (!f_keepOpen)
{
client.disconnect();
client.close();
client = null;
}
return client;
}
catch (MqttPersistenceException e)
{
System.err.println("pubNameSpace : YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY Excpetion - "+e.getMessage());
if (client != null)
{
client.disconnect();
client.close();
client = null;
}
}
catch (Exception e)
{
e.printStackTrace();
System.err.println("Exception e = "+e.getMessage());
if (client != null)
{
client.disconnect();
client.close();
client = null;
}
}
return null;
}
/**
*
* @param location
* @param accountId
* @param userId
* @param clientId
* @param token
* @param nameSpace
* @param message
* @param qos
* @param f_retained
* @param f_keepOpen
* @return
* @throws MqttException
*/
private MqttClient pubNameSpace(String location,
String accountId,
String userId,
String clientId,
String token,
String nameSpace,
String message,
int qos,
boolean f_retained,
boolean f_keepOpen) throws MqttException
{
try
{
MqttClient client = new MqttClient(location, clientId/*, persistence*/);
client.setCallback(this);
MqttConnectOptions options = new MqttConnectOptions();
options.setKeepAliveInterval(60);
options.setConnectionTimeout(120);
options.setPassword(token.toCharArray());
options.setUserName(accountId+":"+userId);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(message.getBytes());
mqttMessage.setQos(qos);
mqttMessage.setRetained(f_retained);
client.connect(options);
client.publish(nameSpace, mqttMessage);
if (!f_keepOpen)
{
client.disconnect();
client.close();
}
return client;
}
catch (MqttPersistenceException e)
{
System.err.println("YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY : "+e.getMessage());
assertTrue(false);
}
catch (Exception e)
{
e.printStackTrace();
System.err.println("Exceptoin e = "+e.getMessage());
assertTrue(false);
}
return null;
}
/**
*
* @param input
* @param output
* @return
* @throws IOException
*/
public static long copyLarge(InputStream input, OutputStream output) throws IOException
{
byte[] buffer = new byte[4096];
long count = 0L;
int n = 0;
while (-1 != (n = input.read(buffer))) {
output.write(buffer, 0, n);
count += n;
}
return count;
}
/**
*
* @param p
* @throws IOException
*/
public static void outputResults (Process p) throws IOException
{
BufferedReader stdInput = new BufferedReader(new
InputStreamReader(p.getInputStream()));
BufferedReader stdError = new BufferedReader(new
InputStreamReader(p.getErrorStream()));
String s;
while ((s = stdInput.readLine()) != null)
{
System.out.println(s);
}
while ((s = stdError.readLine()) != null)
{
System.out.println(s);
}
}
/**
* HELPER
* @param msec
* @throws InterruptedException
*/
void pause_til_done_or_time(int msec) throws InterruptedException
{
int pauseTime = 100;
while (!f_messageReceived && msec > 0 && !f_lost)
{
Thread.sleep(pauseTime);
msec -= pauseTime;
}
}
static Integer numberOfMessages = 0;
public void clearMessageCount()
{
numberOfMessages = 0;
}
public Integer getMessageCount()
{
return numberOfMessages;
}
/**
*
* @param msec
*/
private void waitForItAck(int msec)
{
while (!f_ackReceived)
{
try { Thread.sleep(1000); } catch (Exception e){}
msec= msec-1000;
if (msec < 0)
{
break;
}
}
}
@Override
public String getProtocolScheme() {
return "mqtt+nio";
}
@Override
public boolean isUseSSL() {
return false;
}
public class PahoCallback implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String topic, MqttMessage message)
throws Exception {
m_receiveCounter.incrementAndGet();
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub
}
}
static MqttClient BalstTestClient = null;
String loc = "tcp://localhost:1883";
public class pubThreadBitMsg extends Thread {
public pubThreadBitMsg()
{
synchronized (staticSyncObj)
{
try
{
System.out.println("---- pubTheadBitMsg - constructor");
if (BalstTestClient == null)
{
BalstTestClient = pubNameSpace(loc, "cjutzi",
"curt",
"myclientid_cjutzi_pub",
"curts_client_pub_token",
"/accounts/cjutzi/users/curt/test",
"Starting Client", 1, false, true);
System.out.println("---- pubTheadBitMsg - init");
}
}
catch (MqttException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public void run()
{
// synchronized (staticSyncObj)
{
try
{
System.out.println("---- pubTheadBitMsg - send");
pubNameSpace(BalstTestClient, loc, "cjutzi",
"curt",
"myclientid_cjutzi_pub",
"curts_client_pub_token",
"/accounts/cjutzi/users/curt/test",
BigMessage, 1, false, true);
} catch (MqttException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
@Test
public void test_AckOnOldListenerQos1Blast100B10KBlocks() throws MqttException, InterruptedException
{
MqttClient subClient = new MqttClient("tcp://localhost:1883",
"niosubclient",
new MemoryPersistence());
MqttConnectOptions cOpts = new MqttConnectOptions();
cOpts.setCleanSession(true);
cOpts.setUserName("system:system");
cOpts.setPassword("system_token".toCharArray());
subClient.setCallback(new PahoCallback());
subClient.connect(cOpts);
subClient.subscribe("nio/test");
for (int i = 0; i < numberOfThreads; i++)
{
arrThreads.add(new pubThreadBitMsg());
}
System.out.println("--started");
for (int i = 0; i < numberOfThreads; i++)
{
arrThreads.get(i).start();
}
System.out.println("-- waiting");
Thread.sleep(10000);
assertTrue(numberOfThreads == m_receiveCounter.get());
}
/**
*
*/
private void resetFlag()
{
f_messageReceived = false;
f_ackReceived = false;
}
/***************************************************************/
/** CALL BACKS FOR MQTT */
/***************************************************************/
/**
*
*/
public void connectionLost(Throwable arg0)
{
System.out.println("MQTT - Connection Lost");
// f_terminate = true;
f_lost = true;
}
/**
*
*/
public void deliveryComplete(IMqttDeliveryToken arg0)
{
System.out.println("MQTT - delivery complete: Delivery Tokeh = "+arg0.isComplete());
f_ackReceived = true;
}
/**
*
*/
public void messageArrived(String arg0, MqttMessage arg1) throws Exception
{
synchronized (numberOfMessages)
{
numberOfMessages++;
System.out.println("MQTT - messageArrived "+arg0+"\nMessage: \t["+arg1+"] QoS: ["+arg1.getQos()+"] isDup ["+arg1.isDuplicate()+"] nameSpace = ");
// System.out.print(arg1.isDuplicate()?"*":".");
byte[] payloadBytes = arg1.getPayload();
if (payloadBytes.length >0 )
{
messagePayload = new String(payloadBytes);
}
System.out.println("Message Recieved...");
f_messageReceived = true;
}
}
}
{noformat}
Summary: ` (was: NIO Fails (for MQTT at least) on back-to-back Transport frames)
> `
> -
>
> Key: AMQ-5387
> URL: https://issues.apache.org/jira/browse/AMQ-5387
> Project: ActiveMQ
> Issue Type: Bug
> Components: MQTT
> Affects Versions: 5.11.0
> Environment: Paho as client
> Reporter: Curt Jutzi
> Fix For: NEEDS_REVIEW
>
> Attachments: MQTTCode.java.patch
>
> Original Estimate: 0h
> Remaining Estimate: 0h
>
> {noformat}
> **
> * Licensed to the Apache Software Foundation (ASF) under one or more
> * contributor license agreements. See the NOTICE file distributed with
> * this work for additional information regarding copyright ownership.
> * The ASF licenses this file to You under the Apache License, Version 2.0
> * (the "License"); you may not use this file except in compliance with
> * the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> package org.apache.activemq.transport.mqtt;
> import static org.junit.Assert.*;
> import java.io.BufferedReader;
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.InputStreamReader;
> import java.io.OutputStream;
> import java.util.ArrayList;
> import java.util.concurrent.atomic.AtomicInteger;
> import org.apache.commons.net.util.Base64;
> import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
> import org.eclipse.paho.client.mqttv3.MqttCallback;
> import org.eclipse.paho.client.mqttv3.MqttClient;
> import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
> import org.eclipse.paho.client.mqttv3.MqttException;
> import org.eclipse.paho.client.mqttv3.MqttMessage;
> import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
> import org.eclipse.paho.client.mqttv3.MqttSecurityException;
> import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
> import org.junit.Test;
> /**
> * Test the NIO transport with this Test group
> */
> public class PahoMQTTNIOTest extends PahoMQTTTest implements MqttCallback {
> AtomicInteger m_receiveCounter = new AtomicInteger();
> String BigMessage = "................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................;........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................";;
> static ArrayList<MqttClient> mqttClients = null;
> static final Integer staticSyncObj = new Integer(1);
> String messagePayload = null;
>
> public final int numberOfThreads = 500;
> static ArrayList<pubThreadBitMsg> arrThreads = new ArrayList<pubThreadBitMsg>();
>
> boolean f_messageReceived = false;
> boolean f_ackReceived = false;
> boolean f_lost = false;
> /**
> *
> *
> * @param client
> * @param location
> * @param accountId
> * @param userId
> * @param clientId
> * @param token
> * @param nameSpace
> * @param message
> * @param qos
> * @param f_retained
> * @param f_keepOpen
> * @return
> * @throws MqttException
> */
> private MqttClient pubNameSpace(MqttClient client,
> String location,
> String accountId,
> String userId,
> String clientId,
> String token,
> String nameSpace,
> String message,
> int qos,
> boolean f_retained,
> boolean f_keepOpen) throws MqttException
> {
> try
> {
> boolean f_wasConnected = true;
>
> if (client == null)
> {
> f_wasConnected = false;
> client = new MqttClient(location, clientId/*, persistence*/);
> }
>
> if (!f_wasConnected)
> {
> MqttConnectOptions options = new MqttConnectOptions();
> options.setKeepAliveInterval(60);
> options.setConnectionTimeout(120);
> options.setPassword(token.toCharArray());
> options.setUserName(accountId+":"+userId);
> client.connect(options);
> client.setCallback(this);
> }
>
> MqttMessage mqttMessage = new MqttMessage();
> mqttMessage.setPayload(message.getBytes());
> mqttMessage.setQos(qos);
> mqttMessage.setRetained(f_retained);
>
>
> client.publish(nameSpace, mqttMessage);
>
> if (!f_keepOpen)
> {
> client.disconnect();
> client.close();
> client = null;
> }
> return client;
> }
> catch (MqttPersistenceException e)
> {
> System.err.println("pubNameSpace : YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY Excpetion - "+e.getMessage());
> if (client != null)
> {
> client.disconnect();
> client.close();
> client = null;
> }
> }
> catch (Exception e)
> {
> e.printStackTrace();
> System.err.println("Exception e = "+e.getMessage());
> if (client != null)
> {
> client.disconnect();
> client.close();
> client = null;
> }
> }
> return null;
> }
> /**
> *
> * @param location
> * @param accountId
> * @param userId
> * @param clientId
> * @param token
> * @param nameSpace
> * @param message
> * @param qos
> * @param f_retained
> * @param f_keepOpen
> * @return
> * @throws MqttException
> */
> private MqttClient pubNameSpace(String location,
> String accountId,
> String userId,
> String clientId,
> String token,
> String nameSpace,
> String message,
> int qos,
> boolean f_retained,
> boolean f_keepOpen) throws MqttException
> {
> try
> {
> MqttClient client = new MqttClient(location, clientId/*, persistence*/);
> client.setCallback(this);
> MqttConnectOptions options = new MqttConnectOptions();
> options.setKeepAliveInterval(60);
> options.setConnectionTimeout(120);
> options.setPassword(token.toCharArray());
> options.setUserName(accountId+":"+userId);
>
> MqttMessage mqttMessage = new MqttMessage();
> mqttMessage.setPayload(message.getBytes());
> mqttMessage.setQos(qos);
> mqttMessage.setRetained(f_retained);
>
> client.connect(options);
> client.publish(nameSpace, mqttMessage);
>
> if (!f_keepOpen)
> {
> client.disconnect();
> client.close();
> }
> return client;
> }
> catch (MqttPersistenceException e)
> {
> System.err.println("YOU CAN'T USE THE SAME CLIENTID MULTIPLE TIMES IN A PERSISTENT WAY : "+e.getMessage());
> assertTrue(false);
> }
> catch (Exception e)
> {
> e.printStackTrace();
> System.err.println("Exceptoin e = "+e.getMessage());
> assertTrue(false);
> }
> return null;
> }
>
>
> /**
> *
> * @param input
> * @param output
> * @return
> * @throws IOException
> */
> public static long copyLarge(InputStream input, OutputStream output) throws IOException
> {
> byte[] buffer = new byte[4096];
> long count = 0L;
> int n = 0;
> while (-1 != (n = input.read(buffer))) {
> output.write(buffer, 0, n);
> count += n;
> }
> return count;
> }
>
> /**
> *
> * @param p
> * @throws IOException
> */
> public static void outputResults (Process p) throws IOException
> {
> BufferedReader stdInput = new BufferedReader(new
> InputStreamReader(p.getInputStream()));
> BufferedReader stdError = new BufferedReader(new
> InputStreamReader(p.getErrorStream()));
> String s;
> while ((s = stdInput.readLine()) != null)
> {
> System.out.println(s);
> }
> while ((s = stdError.readLine()) != null)
> {
> System.out.println(s);
> }
> }
>
> /**
> * HELPER
> * @param msec
> * @throws InterruptedException
> */
> void pause_til_done_or_time(int msec) throws InterruptedException
> {
> int pauseTime = 100;
> while (!f_messageReceived && msec > 0 && !f_lost)
> {
> Thread.sleep(pauseTime);
> msec -= pauseTime;
> }
>
> }
> static Integer numberOfMessages = 0;
> public void clearMessageCount()
> {
> numberOfMessages = 0;
> }
> public Integer getMessageCount()
> {
> return numberOfMessages;
> }
> /**
> *
> * @param msec
> */
> private void waitForItAck(int msec)
> {
> while (!f_ackReceived)
> {
>
> try { Thread.sleep(1000); } catch (Exception e){}
>
> msec= msec-1000;
>
> if (msec < 0)
> {
> break;
> }
> }
> }
>
> @Override
> public String getProtocolScheme() {
> return "mqtt+nio";
> }
> @Override
> public boolean isUseSSL() {
> return false;
> }
> public class PahoCallback implements MqttCallback {
> @Override
> public void connectionLost(Throwable cause) {
> // TODO Auto-generated method stub
>
> }
> @Override
> public void messageArrived(String topic, MqttMessage message)
> throws Exception {
> m_receiveCounter.incrementAndGet();
> }
> @Override
> public void deliveryComplete(IMqttDeliveryToken token) {
> // TODO Auto-generated method stub
>
> }
>
> }
>
> static MqttClient BalstTestClient = null;
> String loc = "tcp://localhost:1883";
> public class pubThreadBitMsg extends Thread {
>
> public pubThreadBitMsg()
> {
> synchronized (staticSyncObj)
> {
> try
> {
> System.out.println("---- pubTheadBitMsg - constructor");
> if (BalstTestClient == null)
> {
> BalstTestClient = pubNameSpace(loc, "cjutzi",
> "someone",
> "myclientid_cjutzi_pub",
> "hello",
> "/accounts/cjutzi/users/curt/test",
> "Starting Client", 1, false, true);
> System.out.println("---- pubTheadBitMsg - init");
> }
> }
> catch (MqttException e)
> {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
> }
> public void run()
> {
> // synchronized (staticSyncObj)
> {
> try
> {
> System.out.println("---- pubTheadBitMsg - send");
> pubNameSpace(BalstTestClient, loc, "cjutzi",
> "someone",
> "myclientid_cjutzi_pub",
> "hello",
> "/accounts/cjutzi/users/curt/test",
> BigMessage, 1, false, true);
> } catch (MqttException e)
> {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
> }
> }
>
> @Test
> public void test_AckOnOldListenerQos1Blast100B10KBlocks() throws MqttException, InterruptedException
> {
> MqttClient subClient = new MqttClient("tcp://localhost:1883",
> "niosubclient",
> new MemoryPersistence());
> MqttConnectOptions cOpts = new MqttConnectOptions();
> cOpts.setCleanSession(true);
> cOpts.setUserName("system:system");
> cOpts.setPassword("system_token".toCharArray());
>
>
> subClient.setCallback(new PahoCallback());
> subClient.connect(cOpts);
> subClient.subscribe("nio/test");
>
>
> for (int i = 0; i < numberOfThreads; i++)
> {
> arrThreads.add(new pubThreadBitMsg());
> }
> System.out.println("--started");
> for (int i = 0; i < numberOfThreads; i++)
> {
> arrThreads.get(i).start();
> }
> System.out.println("-- waiting");
> Thread.sleep(10000);
> assertTrue(numberOfThreads == m_receiveCounter.get());
> }
>
>
> /**
> *
> */
> private void resetFlag()
> {
> f_messageReceived = false;
> f_ackReceived = false;
> }
> /***************************************************************/
> /** CALL BACKS FOR MQTT */
> /***************************************************************/
>
>
>
> /**
> *
> */
> public void connectionLost(Throwable arg0)
> {
> System.out.println("MQTT - Connection Lost");
> // f_terminate = true;
> f_lost = true;
> }
> /**
> *
> */
> public void deliveryComplete(IMqttDeliveryToken arg0)
> {
> System.out.println("MQTT - delivery complete: Delivery Tokeh = "+arg0.isComplete());
> f_ackReceived = true;
> }
>
> /**
> *
> */
> public void messageArrived(String arg0, MqttMessage arg1) throws Exception
> {
> synchronized (numberOfMessages)
> {
> numberOfMessages++;
> System.out.println("MQTT - messageArrived "+arg0+"\nMessage: \t["+arg1+"] QoS: ["+arg1.getQos()+"] isDup ["+arg1.isDuplicate()+"] nameSpace = ");
> // System.out.print(arg1.isDuplicate()?"*":".");
> byte[] payloadBytes = arg1.getPayload();
> if (payloadBytes.length >0 )
> {
> messagePayload = new String(payloadBytes);
> }
> System.out.println("Message Recieved...");
> f_messageReceived = true;
> }
> }
>
> }
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)