You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "gongping.zhu (Jira)" <ji...@apache.org> on 2022/08/02 00:34:00 UTC

[jira] [Comment Edited] (ARTEMIS-3913) MQTTReasonCodes byte loss of precision,must int type

    [ https://issues.apache.org/jira/browse/ARTEMIS-3913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17573974#comment-17573974 ] 

gongping.zhu edited comment on ARTEMIS-3913 at 8/2/22 12:33 AM:
----------------------------------------------------------------

my ArtemisBorkderPlugin code 

 

```

import cn.hutool.core.util.ObjectUtil;
import com.yeker.iot.broker.plugin.impl.model.Account;
import com.yeker.iot.broker.plugin.impl.model.DeviceAuth;
import com.yeker.sdk.comm.util.IPUntil;
import org.apache.activemq.artemis.api.core.*;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.MQTTRuntimesException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapperResultSetExtractor;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ArtemisBrokerPlugin implements ActiveMQServerPlugin, Serializable {

private Logger log = LoggerFactory.getLogger(ArtemisBrokerPlugin.class);
/**
 * 遗言topic前缀
*/
private static final String MISSING_TOPIC_PREFIX = "msg.req.lwt";
/**
 * 服务端clientId前缀
*/
private static final String SERVICE_CLIENT_ID_PREFIX = "service-";
/**
 * client id 分隔符
*/
private static final String CLIENT_SPLITOR = "@";

/**
 * 账号数据库信息
*/
private String accountDriver;
private String accountUrl;
private String accountUsername;
private String accountPassword;
private JdbcTemplate accountJdbcTemplate;

/**
 * 授权认证
*/
private boolean accountAuthEnabled = false;
private String accountAuthQuerySQL;

/**
 * 设备数据库信息
*/
private String deviceDriver;
private String deviceUrl;
private String deviceUsername;
private String devicePassword;
private JdbcTemplate deviceJdbcTemplate;
/**
 * 设备授权
*/
private boolean deviceAuthEnabled = false;
private String deviceAuthCheckSQL;
private String deviceAuthLockerSQL;

private boolean deviceStatusSyncabled = false;
private String connectUpdateSQL;
private String disconnectUpdateSQL;
private String lwtUpdateSQL;

private String resetUpdateSQL;

private Map<String, Integer> authConnectTables = new ConcurrentHashMap<>();
private Map<String, DeviceAuth> authDeviceTables = new ConcurrentHashMap<>();

private boolean debug = false;

@Override
public void init(Map<String, String> properties) {

this.accountDriver = properties.get("accountDriver");
this.accountUrl = properties.get("accountUrl");
this.accountUsername = properties.get("accountUsername");
this.accountPassword = properties.get("accountPassword");

this.accountAuthEnabled = Boolean.valueOf(properties.get("accountAuthEnabled"));
if(this.accountAuthEnabled)

{ accountJdbcTemplate = build(accountDriver,accountUrl,accountUsername,accountPassword); }

this.accountAuthQuerySQL = properties.get("accountAuthQuerySQL");

this.deviceDriver = properties.get("deviceDriver");
this.deviceUrl = properties.get("deviceUrl");
this.deviceUsername = properties.get("deviceUsername");
this.devicePassword = properties.get("devicePassword");

this.deviceAuthEnabled = Boolean.valueOf(properties.get("deviceAuthEnabled"));
if(this.deviceAuthEnabled)

{ deviceJdbcTemplate = build(deviceDriver,deviceUrl,deviceUsername,devicePassword); }

this.deviceAuthCheckSQL = properties.get("deviceAuthCheckSQL");
this.deviceAuthLockerSQL = properties.get("deviceAuthLockerSQL");

this.deviceStatusSyncabled = Boolean.valueOf(properties.get("deviceStatusSyncabled"));
this.connectUpdateSQL = properties.get("connectUpdateSQL");
this.disconnectUpdateSQL = properties.get("disconnectUpdateSQL");
this.lwtUpdateSQL = properties.get("lwtUpdateSQL");

this.resetUpdateSQL = properties.get("resetUpdateSQL");

log.info("init :[{}] ",properties);
log.info("AccountAuthEnabled :[{}] ", accountAuthEnabled);
log.info("DeviceAuthEnabled :[{}] ", deviceAuthEnabled);
log.info("DeviceStatusSyncabled :[{}] ", deviceStatusSyncabled);
log.info("{} 插件初始化",this.getClass().getSimpleName());
}

@Override
public void afterCreateConnection(RemotingConnection connection) throws ActiveMQException {
if(debug){
log.info("afterCreateConnection {},{},{}",connection.getClientID(),connection.getRemoteAddress(),connection.getID());
}
}

/**
*
 * @param name
 * @param username
 * @param minLargeMessageSize
 * @param connection
 * @param autoCommitSends
 * @param autoCommitAcks
 * @param preAcknowledge
 * @param xa
 * @param defaultAddress
 * @param callback
 * @param autoCreateQueues
 * @param context
 * @param prefixes
 * @throws ActiveMQException
*/
@Override
public void beforeCreateSession(String name, String username, int minLargeMessageSize,
RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge,
boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context,
Map<SimpleString, RoutingType> prefixes) throws ActiveMQException {
if(debug) {
log.info("beforeCreateSession {},{},{},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID(), name, username);
}
String connId = connection.getID().toString();
if(deviceAuthEnabled && !authConnectTables.containsKey(connId)) \{ // Debug.print(log); authConnectTables.put(connId,0); doConnectValidation(connection); }
}

/**
 * After a session has been created.
*
 * @param session The newly created session
 * @throws ActiveMQException
*/
@Override
public void afterCreateSession(ServerSession session) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterCreateSession {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID());
}
if(accountAuthEnabled) \{ doAccountValidation(session.getUsername(),session.getPassword()); }
}

/**
 * A connection has been destroyed.
*
 * @param connection
 * @throws ActiveMQException
*/
@Override
public void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException {
if(debug) {
log.info("afterDestroyConnection {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID());
}

doDisconnect(connection);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}

/**
 * After a message is sent
*
 * @param session the session that sends the message
 * @param tx
 * @param message
 * @param direct
 * @param noAutoCreateQueue
 * @param result
 * @throws ActiveMQException
*/
@Override
public void afterSend(ServerSession session,
Transaction tx,
Message message,
boolean direct,
boolean noAutoCreateQueue,
RoutingStatus result) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterSend {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID());
}

doSendLwt(connection,message);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}

private void doConnectValidation(RemotingConnection connection) throws ActiveMQException{
Boolean success = false;
String error = "";
boolean invalid = false;
int locked = 0;
int eft = 0;
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
try{
/**
 * 设备授权校验
*/
boolean invalidClientId = invalidClientId(clientId);
if(invalidClientId) \{//licId@mac /** * 无效的ClientId */ invalid = true; throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",clientId)); }
/**

 * 连接用户校验
 * 有效的ClientId格式
*/
if(!isService(clientId)){
String[] clientIdElements = clientId.split(CLIENT_SPLITOR);
String licId = clientIdElements[0];
String devId = clientIdElements[1];

/**
 * 开启设备授权
*/
if(deviceAuthEnabled){
/**
 * 新设备
*/
if(!authDeviceTables.containsKey(clientId)){

/**
 * 验证设备
*/
DeviceAuth auth = validate(licId,devId);
/**
 * 新设备
*/
if(ObjectUtil.isNotEmpty(auth)){

/**
 * 设备第一次
*/
if(ObjectUtil.isEmpty(auth.getDevId())) \{ String lockerSql = deviceAuthLockerSQL; /** * 防止一个licId被多台设备使用 */ locked = deviceJdbcTemplate.update(lockerSql, new Object[]\{devId, licId}
);
if(locked==0)
{ throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId)); }
auth.setDevId(devId);
authDeviceTables.put(clientId,auth);
}
}
}
}

/**
* 更新设备状态【连接】
*/
if(deviceStatusSyncabled)\{ String sql = connectUpdateSQL; String brokerIp = IPUntil.getLocalIp(); eft = deviceJdbcTemplate.update(sql, new Object[]\{connId,clientIp, brokerIp,licId});
}
}
success = true;
}
catch(ActiveMQException ex)\{ error = ex.getMessage(); throw ex; }
finally{
String prefix = "连接异常";
if(success){
prefix = "连接成功";
log.info("{} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId,locked,eft);
}
else{
if(invalid) {
log.debug("{} {} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId,locked,eft, error);
}
else{
log.info("{} {} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId, locked,eft,error);
}
}
}
}

private void doDisconnect(RemotingConnection connection){
/**
* 更新设备状态【断开】
*/
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
int eft = 0;
if(invalidClientId(clientId))\{ return; }
try {
if (clientId.indexOf(CLIENT_SPLITOR) != 1) {
if (deviceStatusSyncabled) \{ String[] clientIdElements = clientId.split(CLIENT_SPLITOR); String licId = clientIdElements[0]; String sql = disconnectUpdateSQL; eft = deviceJdbcTemplate.update(sql, new Object[]\{licId,connId});
}
}
}
finally {
log.info("断开成功 {} {} {} {}", formatClientId(clientId),clientIp,connId,eft);
}
}

private void doSendLwt(RemotingConnection connection,Message message){
int eft = 0;
Boolean success = false;
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
String address = message.getAddress();
if(invalidClientId(clientId))\{ return; }
if (MISSING_TOPIC_PREFIX.equals(address)) {
if (deviceStatusSyncabled && !isService(clientId)) {
try \{ String licId = null; String sql = lwtUpdateSQL; String[] clientIdElements = clientId.split(CLIENT_SPLITOR); licId = clientIdElements[0]; eft = deviceJdbcTemplate.update(sql, new Object[]\{licId, connId});
success = true;
}
finally{
if(success){
log.info("断开成功 {} {} {} {} {}",formatClientId(clientId),clientIp,connId,"LWT" ,eft);
}
else{
log.debug("断开异常 {} {} {} {} {} {}",formatClientId(clientId) ,clientIp,connId,"LWT",eft,message);
}
}
}
}
}

private JdbcTemplate build(String driver, String url, String username, String password)\{ DataSourceBuilder builder = DataSourceBuilder.create(); builder.driverClassName(driver); builder.url(url); builder.username(username); builder.password(password); return new JdbcTemplate(builder.build()); }

private String formatClientId(String clientId){
if(ObjectUtil.isNotEmpty(clientId)){
int index = clientId.toLowerCase().indexOf("service-");
if(index!=-1){
int length = clientId.length();
if(length>(index+44))\{ return clientId.substring(index,index+44); }
else\{ return clientId.substring(index); }
}
return clientId;
}
return clientId;
}

private String formatClientIP(String clientIp){
int index = -1;
if(ObjectUtil.isEmpty(clientIp))\{ return clientIp; }
clientIp = clientIp.replace("/","");
return clientIp;
}

private void doAccountValidation(String username,String password)\{ String sql= accountAuthQuerySQL; List<Account> accounts = accountJdbcTemplate.query(sql,new Object[]\{username,password},new RowMapperResultSetExtractor<Account>(new BeanPropertyRowMapper<Account>(Account.class)));
if(ObjectUtil.isEmpty(accounts))\{ throw new SecurityException("非法租户"); }
}

/**
* @param licId yekerId
* @param devId cpusn or mac_address
*/
private DeviceAuth validate(String licId,String devId) throws ActiveMQException \{ /** * YekerId@CPU_SN;保证一个YekerId只被一台设备使用 */ String sql = deviceAuthCheckSQL; List<DeviceAuth> deviceAuths = deviceJdbcTemplate.query(sql, new Object[]\{licId}, new RowMapperResultSetExtractor<DeviceAuth>(new BeanPropertyRowMapper<DeviceAuth>(DeviceAuth.class)));
if (deviceAuths == null || ObjectUtil.isEmpty(deviceAuths)) \{ throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",licId)); }
DeviceAuth auth = deviceAuths.get(0);
if (ObjectUtil.isNotEmpty(auth.getDevId())
&& !auth.getDevId().equalsIgnoreCase(devId)) \{ throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId)); }
return auth;
}
/**

 * [service node或者broker node]
 * @param clientId
 * @return
*/
private boolean isService(String clientId) \{ return ObjectUtil.isNotEmpty(clientId) && (clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX)!=-1 //服务节点 || clientId.toLowerCase().indexOf("brokernode")!=-1);// broker节点 }

/**
 * clientId是否含有@分割符号
 * @param clientId
 * @return
*/
private static boolean invalidClientId(String clientId) \{ return ObjectUtil.isEmpty(clientId) || (clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX) == -1 //非服务节点 && clientId.toLowerCase().indexOf("brokernode") == -1 //非broker节点 && clientId.indexOf(CLIENT_SPLITOR) == -1);//非正常的client }
/**

 * 最后遗嘱LWT(Last Will & Testament)
 * @param originalTopic
 * @return
*/
private boolean isLwt(String originalTopic) \{ return originalTopic != null && originalTopic.startsWith(MISSING_TOPIC_PREFIX); }

/**
*
 * @param server
*/
@Override
public void registered(ActiveMQServer server){
log.info("{} 插件注册",this.getClass().getSimpleName());
reset();
}
/**
 * 插件卸载
 * @param server
*/
@Override
public void unregistered(ActiveMQServer server){
log.info("{} 插件注销 {}",this.getClass().getSimpleName());
reset();
}

/**
 * 同步因为服务器维护导致相关设备状态不一致
*/
private void reset() \{ authConnectTables.clear(); authDeviceTables.clear(); String brokerIp = IPUntil.getLocalIp(); String sql = resetUpdateSQL; deviceJdbcTemplate.update(sql, new Object[]\{brokerIp}
);
}
}

```

 

!image-2022-08-02-08-23-52-965.png!

!image-2022-08-02-08-24-39-288.png!

 

public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {

```

public void act(MqttMessage message) {
try {
switch (message.fixedHeader().messageType()) {
case AUTH:
handleAuth(message);
break;
case CONNECT:
handleConnect((MqttConnectMessage) message);
break;
case PUBLISH:
handlePublish((MqttPublishMessage) message);
break;
case PUBACK:
handlePuback((MqttPubAckMessage) message);
break;
case PUBREC:
handlePubrec(message);
break;
case PUBREL:
handlePubrel(message);
break;
case PUBCOMP:
handlePubcomp(message);
break;
case SUBSCRIBE:
handleSubscribe((MqttSubscribeMessage) message);
break;
case UNSUBSCRIBE:
handleUnsubscribe((MqttUnsubscribeMessage) message);
break;
case DISCONNECT:
disconnect(false, message);
break;
case UNSUBACK:
case SUBACK:
case PINGREQ: // These are actually handled by the Netty thread directly so this packet should never make it here
case PINGRESP:
case CONNACK: // The server does not instantiate connections therefore any CONNACK received over a connection is an invalid control message.
default:
disconnect(true);
}
} catch(MQTTRuntimesException e) { // Custmer code for reason code to zhe mqtt client
logger.info("@@@@@@@@@@@@@@@:"+e.getCode());
session.getProtocolHandler().sendConnack(e.getCode());
disconnect(true);
}
catch (Exception e) {
logger.info("@@@@@@@@@@@@@@@:"+e.getMessage());
MQTTLogger.LOGGER.errorProcessingControlPacket(message.toString(), e);
if (session.getVersion() == MQTTVersion.MQTT_5) {
sendDisconnect(MQTTReasonCodes.IMPLEMENTATION_SPECIFIC_ERROR);
}
disconnect(true);
} finally {
ReferenceCountUtil.release(message);
}
}

```

}

when i use wireshark capture log 

!image-2022-08-02-08-31-01-074.png!

 

 

 


was (Author: JIRAUSER293605):
my ArtemisBorkderPlugin code 

 

```

package com.yeker.iot.broker.plugin.impl;

import cn.hutool.core.util.ObjectUtil;
import com.yeker.iot.broker.plugin.impl.model.Account;
import com.yeker.iot.broker.plugin.impl.model.DeviceAuth;
import com.yeker.sdk.comm.util.IPUntil;
import org.apache.activemq.artemis.api.core.*;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.MQTTRuntimesException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapperResultSetExtractor;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ArtemisBrokerPlugin implements ActiveMQServerPlugin, Serializable {

private Logger log = LoggerFactory.getLogger(ArtemisBrokerPlugin.class);
/**
* 遗言topic前缀
*/
private static final String MISSING_TOPIC_PREFIX = "msg.req.lwt";
/**
* 服务端clientId前缀
*/
private static final String SERVICE_CLIENT_ID_PREFIX = "service-";
/**
* client id 分隔符
*/
private static final String CLIENT_SPLITOR = "@";

/**
* 账号数据库信息
*/
private String accountDriver;
private String accountUrl;
private String accountUsername;
private String accountPassword;
private JdbcTemplate accountJdbcTemplate;

/**
* 授权认证
*/
private boolean accountAuthEnabled = false;
private String accountAuthQuerySQL;

/**
* 设备数据库信息
*/
private String deviceDriver;
private String deviceUrl;
private String deviceUsername;
private String devicePassword;
private JdbcTemplate deviceJdbcTemplate;
/**
* 设备授权
*/
private boolean deviceAuthEnabled = false;
private String deviceAuthCheckSQL;
private String deviceAuthLockerSQL;


private boolean deviceStatusSyncabled = false;
private String connectUpdateSQL;
private String disconnectUpdateSQL;
private String lwtUpdateSQL;

private String resetUpdateSQL;

private Map<String, Integer> authConnectTables = new ConcurrentHashMap<>();
private Map<String, DeviceAuth> authDeviceTables = new ConcurrentHashMap<>();

private boolean debug = false;

@Override
public void init(Map<String, String> properties) {

this.accountDriver = properties.get("accountDriver");
this.accountUrl = properties.get("accountUrl");
this.accountUsername = properties.get("accountUsername");
this.accountPassword = properties.get("accountPassword");

this.accountAuthEnabled = Boolean.valueOf(properties.get("accountAuthEnabled"));
if(this.accountAuthEnabled){
accountJdbcTemplate = build(accountDriver,accountUrl,accountUsername,accountPassword);
}
this.accountAuthQuerySQL = properties.get("accountAuthQuerySQL");


this.deviceDriver = properties.get("deviceDriver");
this.deviceUrl = properties.get("deviceUrl");
this.deviceUsername = properties.get("deviceUsername");
this.devicePassword = properties.get("devicePassword");

this.deviceAuthEnabled = Boolean.valueOf(properties.get("deviceAuthEnabled"));
if(this.deviceAuthEnabled){
deviceJdbcTemplate = build(deviceDriver,deviceUrl,deviceUsername,devicePassword);
}
this.deviceAuthCheckSQL = properties.get("deviceAuthCheckSQL");
this.deviceAuthLockerSQL = properties.get("deviceAuthLockerSQL");

this.deviceStatusSyncabled = Boolean.valueOf(properties.get("deviceStatusSyncabled"));
this.connectUpdateSQL = properties.get("connectUpdateSQL");
this.disconnectUpdateSQL = properties.get("disconnectUpdateSQL");
this.lwtUpdateSQL = properties.get("lwtUpdateSQL");

this.resetUpdateSQL = properties.get("resetUpdateSQL");



log.info("init :[{}] ",properties);
log.info("AccountAuthEnabled :[{}] ", accountAuthEnabled);
log.info("DeviceAuthEnabled :[{}] ", deviceAuthEnabled);
log.info("DeviceStatusSyncabled :[{}] ", deviceStatusSyncabled);
log.info("{} 插件初始化",this.getClass().getSimpleName());
}

@Override
public void afterCreateConnection(RemotingConnection connection) throws ActiveMQException {
if(debug){
log.info("afterCreateConnection {},{},{}",connection.getClientID(),connection.getRemoteAddress(),connection.getID());
}
}

/**
*
* @param name
* @param username
* @param minLargeMessageSize
* @param connection
* @param autoCommitSends
* @param autoCommitAcks
* @param preAcknowledge
* @param xa
* @param defaultAddress
* @param callback
* @param autoCreateQueues
* @param context
* @param prefixes
* @throws ActiveMQException
*/
@Override
public void beforeCreateSession(String name, String username, int minLargeMessageSize,
RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge,
boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context,
Map<SimpleString, RoutingType> prefixes) throws ActiveMQException {
if(debug) {
log.info("beforeCreateSession {},{},{},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID(), name, username);
}
String connId = connection.getID().toString();
if(deviceAuthEnabled && !authConnectTables.containsKey(connId)) {
// Debug.print(log);
authConnectTables.put(connId,0);
doConnectValidation(connection);
}
}

/**
* After a session has been created.
*
* @param session The newly created session
* @throws ActiveMQException
*/
@Override
public void afterCreateSession(ServerSession session) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterCreateSession {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID());
}
if(accountAuthEnabled){
doAccountValidation(session.getUsername(),session.getPassword());
}
}

/**
* A connection has been destroyed.
*
* @param connection
* @throws ActiveMQException
*/
@Override
public void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException {
if(debug) {
log.info("afterDestroyConnection {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID());
}

doDisconnect(connection);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}

/**
* After a message is sent
*
* @param session the session that sends the message
* @param tx
* @param message
* @param direct
* @param noAutoCreateQueue
* @param result
* @throws ActiveMQException
*/
@Override
public void afterSend(ServerSession session,
Transaction tx,
Message message,
boolean direct,
boolean noAutoCreateQueue,
RoutingStatus result) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterSend {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID());
}

doSendLwt(connection,message);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}

private void doConnectValidation(RemotingConnection connection) throws ActiveMQException{
Boolean success = false;
String error = "";
boolean invalid = false;
int locked = 0;
int eft = 0;
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
try{
/**
* 设备授权校验
*/
boolean invalidClientId = invalidClientId(clientId);
if(invalidClientId){//licId@mac
/**
* 无效的ClientId
*/
invalid = true;
throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",clientId));
}
/**
* 连接用户校验
* 有效的ClientId格式
*/
if(!isService(clientId)){
String[] clientIdElements = clientId.split(CLIENT_SPLITOR);
String licId = clientIdElements[0];
String devId = clientIdElements[1];

/**
* 开启设备授权
*/
if(deviceAuthEnabled){
/**
* 新设备
*/
if(!authDeviceTables.containsKey(clientId)){

/**
* 验证设备
*/
DeviceAuth auth = validate(licId,devId);
/**
* 新设备
*/
if(ObjectUtil.isNotEmpty(auth)){

/**
* 设备第一次
*/
if(ObjectUtil.isEmpty(auth.getDevId())){
String lockerSql = deviceAuthLockerSQL;
/**
* 防止一个licId被多台设备使用
*/
locked = deviceJdbcTemplate.update(lockerSql, new Object[]\{devId, licId});
if(locked==0){
throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId));
}
auth.setDevId(devId);
authDeviceTables.put(clientId,auth);
}
}
}
}

/**
* 更新设备状态【连接】
*/
if(deviceStatusSyncabled){
String sql = connectUpdateSQL;
String brokerIp = IPUntil.getLocalIp();
eft = deviceJdbcTemplate.update(sql, new Object[]\{connId,clientIp, brokerIp,licId});
}
}
success = true;
}
catch(ActiveMQException ex){
error = ex.getMessage();
throw ex;
}
finally{
String prefix = "连接异常";
if(success){
prefix = "连接成功";
log.info("{} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId,locked,eft);
}
else{
if(invalid) {
log.debug("{} {} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId,locked,eft, error);
}
else{
log.info("{} {} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId, locked,eft,error);
}
}
}
}

private void doDisconnect(RemotingConnection connection){
/**
* 更新设备状态【断开】
*/
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
int eft = 0;
if(invalidClientId(clientId)){
return;
}
try {
if (clientId.indexOf(CLIENT_SPLITOR) != 1) {
if (deviceStatusSyncabled) {
String[] clientIdElements = clientId.split(CLIENT_SPLITOR);
String licId = clientIdElements[0];
String sql = disconnectUpdateSQL;
eft = deviceJdbcTemplate.update(sql, new Object[]\{licId,connId});
}
}
}
finally {
log.info("断开成功 {} {} {} {}", formatClientId(clientId),clientIp,connId,eft);
}
}

private void doSendLwt(RemotingConnection connection,Message message){
int eft = 0;
Boolean success = false;
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
String address = message.getAddress();
if(invalidClientId(clientId)){
return;
}
if (MISSING_TOPIC_PREFIX.equals(address)) {
if (deviceStatusSyncabled && !isService(clientId)) {
try {
String licId = null;
String sql = lwtUpdateSQL;
String[] clientIdElements = clientId.split(CLIENT_SPLITOR);
licId = clientIdElements[0];
eft = deviceJdbcTemplate.update(sql, new Object[]\{licId, connId});
success = true;
}
finally{
if(success){
log.info("断开成功 {} {} {} {} {}",formatClientId(clientId),clientIp,connId,"LWT" ,eft);
}
else{
log.debug("断开异常 {} {} {} {} {} {}",formatClientId(clientId) ,clientIp,connId,"LWT",eft,message);
}
}
}
}
}

private JdbcTemplate build(String driver, String url, String username, String password){
DataSourceBuilder builder = DataSourceBuilder.create();
builder.driverClassName(driver);
builder.url(url);
builder.username(username);
builder.password(password);
return new JdbcTemplate(builder.build());
}

private String formatClientId(String clientId){
if(ObjectUtil.isNotEmpty(clientId)){
int index = clientId.toLowerCase().indexOf("service-");
if(index!=-1){
int length = clientId.length();
if(length>(index+44)){
return clientId.substring(index,index+44);
}
else{
return clientId.substring(index);
}
}
return clientId;
}
return clientId;
}

private String formatClientIP(String clientIp){
int index = -1;
if(ObjectUtil.isEmpty(clientIp)){
return clientIp;
}
clientIp = clientIp.replace("/","");
return clientIp;
}

private void doAccountValidation(String username,String password){
String sql= accountAuthQuerySQL;
List<Account> accounts = accountJdbcTemplate.query(sql,new Object[]\{username,password},new RowMapperResultSetExtractor<Account>(new BeanPropertyRowMapper<Account>(Account.class)));
if(ObjectUtil.isEmpty(accounts)){
throw new SecurityException("非法租户");
}
}

/**
* @param licId yekerId
* @param devId cpusn or mac_address
*/
private DeviceAuth validate(String licId,String devId) throws ActiveMQException {
/**
* YekerId@CPU_SN;保证一个YekerId只被一台设备使用
*/
String sql = deviceAuthCheckSQL;
List<DeviceAuth> deviceAuths = deviceJdbcTemplate.query(sql, new Object[]\{licId}, new RowMapperResultSetExtractor<DeviceAuth>(new BeanPropertyRowMapper<DeviceAuth>(DeviceAuth.class)));
if (deviceAuths == null || ObjectUtil.isEmpty(deviceAuths)) {
throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",licId));
}
DeviceAuth auth = deviceAuths.get(0);
if (ObjectUtil.isNotEmpty(auth.getDevId())
&& !auth.getDevId().equalsIgnoreCase(devId)) {
throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId));
}
return auth;
}
/**
* [service node或者broker node]
* @param clientId
* @return
*/
private boolean isService(String clientId){
return ObjectUtil.isNotEmpty(clientId)
&& (clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX)!=-1 //服务节点
|| clientId.toLowerCase().indexOf("brokernode")!=-1);// broker节点
}

/**
* clientId是否含有@分割符号
* @param clientId
* @return
*/
private static boolean invalidClientId(String clientId){
return ObjectUtil.isEmpty(clientId)
|| (clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX) == -1 //非服务节点
&& clientId.toLowerCase().indexOf("brokernode") == -1 //非broker节点
&& clientId.indexOf(CLIENT_SPLITOR) == -1);//非正常的client
}
/**
* 最后遗嘱LWT(Last Will & Testament)
* @param originalTopic
* @return
*/
private boolean isLwt(String originalTopic) {
return originalTopic != null && originalTopic.startsWith(MISSING_TOPIC_PREFIX);
}

/**
*
* @param server
*/
@Override
public void registered(ActiveMQServer server){
log.info("{} 插件注册",this.getClass().getSimpleName());
reset();
}
/**
* 插件卸载
* @param server
*/
@Override
public void unregistered(ActiveMQServer server){
log.info("{} 插件注销 {}",this.getClass().getSimpleName());
reset();
}

/**
* 同步因为服务器维护导致相关设备状态不一致
*/
private void reset(){
authConnectTables.clear();
authDeviceTables.clear();
String brokerIp = IPUntil.getLocalIp();
String sql = resetUpdateSQL;
deviceJdbcTemplate.update(sql, new Object[]\{brokerIp});
}
}

```

 

!image-2022-08-02-08-23-52-965.png!

!image-2022-08-02-08-24-39-288.png!

 

public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {

 

...

public void act(MqttMessage message) {
      try {
         switch (message.fixedHeader().messageType()) {
            case AUTH:
               handleAuth(message);
               break;
            case CONNECT:
               handleConnect((MqttConnectMessage) message);
               break;
            case PUBLISH:
               handlePublish((MqttPublishMessage) message);
               break;
            case PUBACK:
               handlePuback((MqttPubAckMessage) message);
               break;
            case PUBREC:
               handlePubrec(message);
               break;
            case PUBREL:
               handlePubrel(message);
               break;
            case PUBCOMP:
               handlePubcomp(message);
               break;
            case SUBSCRIBE:
               handleSubscribe((MqttSubscribeMessage) message);
               break;
            case UNSUBSCRIBE:
               handleUnsubscribe((MqttUnsubscribeMessage) message);
               break;
            case DISCONNECT:
               disconnect(false, message);
               break;
            case UNSUBACK:
            case SUBACK:
            case PINGREQ: // These are actually handled by the Netty thread directly so this packet should never make it here
            case PINGRESP:
            case CONNACK: // The server does not instantiate connections therefore any CONNACK received over a connection is an invalid control message.
            default:
               disconnect(true);
         }
      } catch(MQTTRuntimesException e) {// Customer code throw zhe reason code to zhe mqtt client
         logger.info("@@@@@@@@@@@@@@@:"+e.getCode());
         session.getProtocolHandler().sendConnack(e.getCode());
         disconnect(true);
      }
      catch (Exception e) {
         logger.info("@@@@@@@@@@@@@@@:"+e.getMessage());
         MQTTLogger.LOGGER.errorProcessingControlPacket(message.toString(), e);
         if (session.getVersion() == MQTTVersion.MQTT_5) {
            sendDisconnect(MQTTReasonCodes.IMPLEMENTATION_SPECIFIC_ERROR);
         }
         disconnect(true);
      } finally {
         ReferenceCountUtil.release(message);
      }
   }

...

}

when i use wireshark capture log 

!image-2022-08-02-08-31-01-074.png!

 

 

 

> MQTTReasonCodes byte loss of precision,must int type
> ----------------------------------------------------
>
>                 Key: ARTEMIS-3913
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3913
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>            Reporter: gongping.zhu
>            Priority: Major
>         Attachments: image-2022-08-02-08-23-52-965.png, image-2022-08-02-08-24-39-288.png, image-2022-08-02-08-31-01-074.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)