You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "gongping.zhu (Jira)" <ji...@apache.org> on 2022/08/02 00:34:00 UTC
[jira] [Comment Edited] (ARTEMIS-3913) MQTTReasonCodes byte loss of precision,must int type
[ https://issues.apache.org/jira/browse/ARTEMIS-3913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17573974#comment-17573974 ]
gongping.zhu edited comment on ARTEMIS-3913 at 8/2/22 12:33 AM:
----------------------------------------------------------------
my ArtemisBorkderPlugin code
```
import cn.hutool.core.util.ObjectUtil;
import com.yeker.iot.broker.plugin.impl.model.Account;
import com.yeker.iot.broker.plugin.impl.model.DeviceAuth;
import com.yeker.sdk.comm.util.IPUntil;
import org.apache.activemq.artemis.api.core.*;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.MQTTRuntimesException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapperResultSetExtractor;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ArtemisBrokerPlugin implements ActiveMQServerPlugin, Serializable {
private Logger log = LoggerFactory.getLogger(ArtemisBrokerPlugin.class);
/**
* 遗言topic前缀
*/
private static final String MISSING_TOPIC_PREFIX = "msg.req.lwt";
/**
* 服务端clientId前缀
*/
private static final String SERVICE_CLIENT_ID_PREFIX = "service-";
/**
* client id 分隔符
*/
private static final String CLIENT_SPLITOR = "@";
/**
* 账号数据库信息
*/
private String accountDriver;
private String accountUrl;
private String accountUsername;
private String accountPassword;
private JdbcTemplate accountJdbcTemplate;
/**
* 授权认证
*/
private boolean accountAuthEnabled = false;
private String accountAuthQuerySQL;
/**
* 设备数据库信息
*/
private String deviceDriver;
private String deviceUrl;
private String deviceUsername;
private String devicePassword;
private JdbcTemplate deviceJdbcTemplate;
/**
* 设备授权
*/
private boolean deviceAuthEnabled = false;
private String deviceAuthCheckSQL;
private String deviceAuthLockerSQL;
private boolean deviceStatusSyncabled = false;
private String connectUpdateSQL;
private String disconnectUpdateSQL;
private String lwtUpdateSQL;
private String resetUpdateSQL;
private Map<String, Integer> authConnectTables = new ConcurrentHashMap<>();
private Map<String, DeviceAuth> authDeviceTables = new ConcurrentHashMap<>();
private boolean debug = false;
@Override
public void init(Map<String, String> properties) {
this.accountDriver = properties.get("accountDriver");
this.accountUrl = properties.get("accountUrl");
this.accountUsername = properties.get("accountUsername");
this.accountPassword = properties.get("accountPassword");
this.accountAuthEnabled = Boolean.valueOf(properties.get("accountAuthEnabled"));
if(this.accountAuthEnabled)
{ accountJdbcTemplate = build(accountDriver,accountUrl,accountUsername,accountPassword); }
this.accountAuthQuerySQL = properties.get("accountAuthQuerySQL");
this.deviceDriver = properties.get("deviceDriver");
this.deviceUrl = properties.get("deviceUrl");
this.deviceUsername = properties.get("deviceUsername");
this.devicePassword = properties.get("devicePassword");
this.deviceAuthEnabled = Boolean.valueOf(properties.get("deviceAuthEnabled"));
if(this.deviceAuthEnabled)
{ deviceJdbcTemplate = build(deviceDriver,deviceUrl,deviceUsername,devicePassword); }
this.deviceAuthCheckSQL = properties.get("deviceAuthCheckSQL");
this.deviceAuthLockerSQL = properties.get("deviceAuthLockerSQL");
this.deviceStatusSyncabled = Boolean.valueOf(properties.get("deviceStatusSyncabled"));
this.connectUpdateSQL = properties.get("connectUpdateSQL");
this.disconnectUpdateSQL = properties.get("disconnectUpdateSQL");
this.lwtUpdateSQL = properties.get("lwtUpdateSQL");
this.resetUpdateSQL = properties.get("resetUpdateSQL");
log.info("init :[{}] ",properties);
log.info("AccountAuthEnabled :[{}] ", accountAuthEnabled);
log.info("DeviceAuthEnabled :[{}] ", deviceAuthEnabled);
log.info("DeviceStatusSyncabled :[{}] ", deviceStatusSyncabled);
log.info("{} 插件初始化",this.getClass().getSimpleName());
}
@Override
public void afterCreateConnection(RemotingConnection connection) throws ActiveMQException {
if(debug){
log.info("afterCreateConnection {},{},{}",connection.getClientID(),connection.getRemoteAddress(),connection.getID());
}
}
/**
*
* @param name
* @param username
* @param minLargeMessageSize
* @param connection
* @param autoCommitSends
* @param autoCommitAcks
* @param preAcknowledge
* @param xa
* @param defaultAddress
* @param callback
* @param autoCreateQueues
* @param context
* @param prefixes
* @throws ActiveMQException
*/
@Override
public void beforeCreateSession(String name, String username, int minLargeMessageSize,
RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge,
boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context,
Map<SimpleString, RoutingType> prefixes) throws ActiveMQException {
if(debug) {
log.info("beforeCreateSession {},{},{},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID(), name, username);
}
String connId = connection.getID().toString();
if(deviceAuthEnabled && !authConnectTables.containsKey(connId)) \{ // Debug.print(log); authConnectTables.put(connId,0); doConnectValidation(connection); }
}
/**
* After a session has been created.
*
* @param session The newly created session
* @throws ActiveMQException
*/
@Override
public void afterCreateSession(ServerSession session) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterCreateSession {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID());
}
if(accountAuthEnabled) \{ doAccountValidation(session.getUsername(),session.getPassword()); }
}
/**
* A connection has been destroyed.
*
* @param connection
* @throws ActiveMQException
*/
@Override
public void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException {
if(debug) {
log.info("afterDestroyConnection {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID());
}
doDisconnect(connection);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}
/**
* After a message is sent
*
* @param session the session that sends the message
* @param tx
* @param message
* @param direct
* @param noAutoCreateQueue
* @param result
* @throws ActiveMQException
*/
@Override
public void afterSend(ServerSession session,
Transaction tx,
Message message,
boolean direct,
boolean noAutoCreateQueue,
RoutingStatus result) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterSend {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID());
}
doSendLwt(connection,message);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}
private void doConnectValidation(RemotingConnection connection) throws ActiveMQException{
Boolean success = false;
String error = "";
boolean invalid = false;
int locked = 0;
int eft = 0;
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
try{
/**
* 设备授权校验
*/
boolean invalidClientId = invalidClientId(clientId);
if(invalidClientId) \{//licId@mac /** * 无效的ClientId */ invalid = true; throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",clientId)); }
/**
* 连接用户校验
* 有效的ClientId格式
*/
if(!isService(clientId)){
String[] clientIdElements = clientId.split(CLIENT_SPLITOR);
String licId = clientIdElements[0];
String devId = clientIdElements[1];
/**
* 开启设备授权
*/
if(deviceAuthEnabled){
/**
* 新设备
*/
if(!authDeviceTables.containsKey(clientId)){
/**
* 验证设备
*/
DeviceAuth auth = validate(licId,devId);
/**
* 新设备
*/
if(ObjectUtil.isNotEmpty(auth)){
/**
* 设备第一次
*/
if(ObjectUtil.isEmpty(auth.getDevId())) \{ String lockerSql = deviceAuthLockerSQL; /** * 防止一个licId被多台设备使用 */ locked = deviceJdbcTemplate.update(lockerSql, new Object[]\{devId, licId}
);
if(locked==0)
{ throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId)); }
auth.setDevId(devId);
authDeviceTables.put(clientId,auth);
}
}
}
}
/**
* 更新设备状态【连接】
*/
if(deviceStatusSyncabled)\{ String sql = connectUpdateSQL; String brokerIp = IPUntil.getLocalIp(); eft = deviceJdbcTemplate.update(sql, new Object[]\{connId,clientIp, brokerIp,licId});
}
}
success = true;
}
catch(ActiveMQException ex)\{ error = ex.getMessage(); throw ex; }
finally{
String prefix = "连接异常";
if(success){
prefix = "连接成功";
log.info("{} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId,locked,eft);
}
else{
if(invalid) {
log.debug("{} {} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId,locked,eft, error);
}
else{
log.info("{} {} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId, locked,eft,error);
}
}
}
}
private void doDisconnect(RemotingConnection connection){
/**
* 更新设备状态【断开】
*/
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
int eft = 0;
if(invalidClientId(clientId))\{ return; }
try {
if (clientId.indexOf(CLIENT_SPLITOR) != 1) {
if (deviceStatusSyncabled) \{ String[] clientIdElements = clientId.split(CLIENT_SPLITOR); String licId = clientIdElements[0]; String sql = disconnectUpdateSQL; eft = deviceJdbcTemplate.update(sql, new Object[]\{licId,connId});
}
}
}
finally {
log.info("断开成功 {} {} {} {}", formatClientId(clientId),clientIp,connId,eft);
}
}
private void doSendLwt(RemotingConnection connection,Message message){
int eft = 0;
Boolean success = false;
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
String address = message.getAddress();
if(invalidClientId(clientId))\{ return; }
if (MISSING_TOPIC_PREFIX.equals(address)) {
if (deviceStatusSyncabled && !isService(clientId)) {
try \{ String licId = null; String sql = lwtUpdateSQL; String[] clientIdElements = clientId.split(CLIENT_SPLITOR); licId = clientIdElements[0]; eft = deviceJdbcTemplate.update(sql, new Object[]\{licId, connId});
success = true;
}
finally{
if(success){
log.info("断开成功 {} {} {} {} {}",formatClientId(clientId),clientIp,connId,"LWT" ,eft);
}
else{
log.debug("断开异常 {} {} {} {} {} {}",formatClientId(clientId) ,clientIp,connId,"LWT",eft,message);
}
}
}
}
}
private JdbcTemplate build(String driver, String url, String username, String password)\{ DataSourceBuilder builder = DataSourceBuilder.create(); builder.driverClassName(driver); builder.url(url); builder.username(username); builder.password(password); return new JdbcTemplate(builder.build()); }
private String formatClientId(String clientId){
if(ObjectUtil.isNotEmpty(clientId)){
int index = clientId.toLowerCase().indexOf("service-");
if(index!=-1){
int length = clientId.length();
if(length>(index+44))\{ return clientId.substring(index,index+44); }
else\{ return clientId.substring(index); }
}
return clientId;
}
return clientId;
}
private String formatClientIP(String clientIp){
int index = -1;
if(ObjectUtil.isEmpty(clientIp))\{ return clientIp; }
clientIp = clientIp.replace("/","");
return clientIp;
}
private void doAccountValidation(String username,String password)\{ String sql= accountAuthQuerySQL; List<Account> accounts = accountJdbcTemplate.query(sql,new Object[]\{username,password},new RowMapperResultSetExtractor<Account>(new BeanPropertyRowMapper<Account>(Account.class)));
if(ObjectUtil.isEmpty(accounts))\{ throw new SecurityException("非法租户"); }
}
/**
* @param licId yekerId
* @param devId cpusn or mac_address
*/
private DeviceAuth validate(String licId,String devId) throws ActiveMQException \{ /** * YekerId@CPU_SN;保证一个YekerId只被一台设备使用 */ String sql = deviceAuthCheckSQL; List<DeviceAuth> deviceAuths = deviceJdbcTemplate.query(sql, new Object[]\{licId}, new RowMapperResultSetExtractor<DeviceAuth>(new BeanPropertyRowMapper<DeviceAuth>(DeviceAuth.class)));
if (deviceAuths == null || ObjectUtil.isEmpty(deviceAuths)) \{ throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",licId)); }
DeviceAuth auth = deviceAuths.get(0);
if (ObjectUtil.isNotEmpty(auth.getDevId())
&& !auth.getDevId().equalsIgnoreCase(devId)) \{ throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId)); }
return auth;
}
/**
* [service node或者broker node]
* @param clientId
* @return
*/
private boolean isService(String clientId) \{ return ObjectUtil.isNotEmpty(clientId) && (clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX)!=-1 //服务节点 || clientId.toLowerCase().indexOf("brokernode")!=-1);// broker节点 }
/**
* clientId是否含有@分割符号
* @param clientId
* @return
*/
private static boolean invalidClientId(String clientId) \{ return ObjectUtil.isEmpty(clientId) || (clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX) == -1 //非服务节点 && clientId.toLowerCase().indexOf("brokernode") == -1 //非broker节点 && clientId.indexOf(CLIENT_SPLITOR) == -1);//非正常的client }
/**
* 最后遗嘱LWT(Last Will & Testament)
* @param originalTopic
* @return
*/
private boolean isLwt(String originalTopic) \{ return originalTopic != null && originalTopic.startsWith(MISSING_TOPIC_PREFIX); }
/**
*
* @param server
*/
@Override
public void registered(ActiveMQServer server){
log.info("{} 插件注册",this.getClass().getSimpleName());
reset();
}
/**
* 插件卸载
* @param server
*/
@Override
public void unregistered(ActiveMQServer server){
log.info("{} 插件注销 {}",this.getClass().getSimpleName());
reset();
}
/**
* 同步因为服务器维护导致相关设备状态不一致
*/
private void reset() \{ authConnectTables.clear(); authDeviceTables.clear(); String brokerIp = IPUntil.getLocalIp(); String sql = resetUpdateSQL; deviceJdbcTemplate.update(sql, new Object[]\{brokerIp}
);
}
}
```
!image-2022-08-02-08-23-52-965.png!
!image-2022-08-02-08-24-39-288.png!
public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
```
public void act(MqttMessage message) {
try {
switch (message.fixedHeader().messageType()) {
case AUTH:
handleAuth(message);
break;
case CONNECT:
handleConnect((MqttConnectMessage) message);
break;
case PUBLISH:
handlePublish((MqttPublishMessage) message);
break;
case PUBACK:
handlePuback((MqttPubAckMessage) message);
break;
case PUBREC:
handlePubrec(message);
break;
case PUBREL:
handlePubrel(message);
break;
case PUBCOMP:
handlePubcomp(message);
break;
case SUBSCRIBE:
handleSubscribe((MqttSubscribeMessage) message);
break;
case UNSUBSCRIBE:
handleUnsubscribe((MqttUnsubscribeMessage) message);
break;
case DISCONNECT:
disconnect(false, message);
break;
case UNSUBACK:
case SUBACK:
case PINGREQ: // These are actually handled by the Netty thread directly so this packet should never make it here
case PINGRESP:
case CONNACK: // The server does not instantiate connections therefore any CONNACK received over a connection is an invalid control message.
default:
disconnect(true);
}
} catch(MQTTRuntimesException e) { // Custmer code for reason code to zhe mqtt client
logger.info("@@@@@@@@@@@@@@@:"+e.getCode());
session.getProtocolHandler().sendConnack(e.getCode());
disconnect(true);
}
catch (Exception e) {
logger.info("@@@@@@@@@@@@@@@:"+e.getMessage());
MQTTLogger.LOGGER.errorProcessingControlPacket(message.toString(), e);
if (session.getVersion() == MQTTVersion.MQTT_5) {
sendDisconnect(MQTTReasonCodes.IMPLEMENTATION_SPECIFIC_ERROR);
}
disconnect(true);
} finally {
ReferenceCountUtil.release(message);
}
}
```
}
when i use wireshark capture log
!image-2022-08-02-08-31-01-074.png!
was (Author: JIRAUSER293605):
my ArtemisBorkderPlugin code
```
package com.yeker.iot.broker.plugin.impl;
import cn.hutool.core.util.ObjectUtil;
import com.yeker.iot.broker.plugin.impl.model.Account;
import com.yeker.iot.broker.plugin.impl.model.DeviceAuth;
import com.yeker.sdk.comm.util.IPUntil;
import org.apache.activemq.artemis.api.core.*;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.MQTTRuntimesException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapperResultSetExtractor;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ArtemisBrokerPlugin implements ActiveMQServerPlugin, Serializable {
private Logger log = LoggerFactory.getLogger(ArtemisBrokerPlugin.class);
/**
* 遗言topic前缀
*/
private static final String MISSING_TOPIC_PREFIX = "msg.req.lwt";
/**
* 服务端clientId前缀
*/
private static final String SERVICE_CLIENT_ID_PREFIX = "service-";
/**
* client id 分隔符
*/
private static final String CLIENT_SPLITOR = "@";
/**
* 账号数据库信息
*/
private String accountDriver;
private String accountUrl;
private String accountUsername;
private String accountPassword;
private JdbcTemplate accountJdbcTemplate;
/**
* 授权认证
*/
private boolean accountAuthEnabled = false;
private String accountAuthQuerySQL;
/**
* 设备数据库信息
*/
private String deviceDriver;
private String deviceUrl;
private String deviceUsername;
private String devicePassword;
private JdbcTemplate deviceJdbcTemplate;
/**
* 设备授权
*/
private boolean deviceAuthEnabled = false;
private String deviceAuthCheckSQL;
private String deviceAuthLockerSQL;
private boolean deviceStatusSyncabled = false;
private String connectUpdateSQL;
private String disconnectUpdateSQL;
private String lwtUpdateSQL;
private String resetUpdateSQL;
private Map<String, Integer> authConnectTables = new ConcurrentHashMap<>();
private Map<String, DeviceAuth> authDeviceTables = new ConcurrentHashMap<>();
private boolean debug = false;
@Override
public void init(Map<String, String> properties) {
this.accountDriver = properties.get("accountDriver");
this.accountUrl = properties.get("accountUrl");
this.accountUsername = properties.get("accountUsername");
this.accountPassword = properties.get("accountPassword");
this.accountAuthEnabled = Boolean.valueOf(properties.get("accountAuthEnabled"));
if(this.accountAuthEnabled){
accountJdbcTemplate = build(accountDriver,accountUrl,accountUsername,accountPassword);
}
this.accountAuthQuerySQL = properties.get("accountAuthQuerySQL");
this.deviceDriver = properties.get("deviceDriver");
this.deviceUrl = properties.get("deviceUrl");
this.deviceUsername = properties.get("deviceUsername");
this.devicePassword = properties.get("devicePassword");
this.deviceAuthEnabled = Boolean.valueOf(properties.get("deviceAuthEnabled"));
if(this.deviceAuthEnabled){
deviceJdbcTemplate = build(deviceDriver,deviceUrl,deviceUsername,devicePassword);
}
this.deviceAuthCheckSQL = properties.get("deviceAuthCheckSQL");
this.deviceAuthLockerSQL = properties.get("deviceAuthLockerSQL");
this.deviceStatusSyncabled = Boolean.valueOf(properties.get("deviceStatusSyncabled"));
this.connectUpdateSQL = properties.get("connectUpdateSQL");
this.disconnectUpdateSQL = properties.get("disconnectUpdateSQL");
this.lwtUpdateSQL = properties.get("lwtUpdateSQL");
this.resetUpdateSQL = properties.get("resetUpdateSQL");
log.info("init :[{}] ",properties);
log.info("AccountAuthEnabled :[{}] ", accountAuthEnabled);
log.info("DeviceAuthEnabled :[{}] ", deviceAuthEnabled);
log.info("DeviceStatusSyncabled :[{}] ", deviceStatusSyncabled);
log.info("{} 插件初始化",this.getClass().getSimpleName());
}
@Override
public void afterCreateConnection(RemotingConnection connection) throws ActiveMQException {
if(debug){
log.info("afterCreateConnection {},{},{}",connection.getClientID(),connection.getRemoteAddress(),connection.getID());
}
}
/**
*
* @param name
* @param username
* @param minLargeMessageSize
* @param connection
* @param autoCommitSends
* @param autoCommitAcks
* @param preAcknowledge
* @param xa
* @param defaultAddress
* @param callback
* @param autoCreateQueues
* @param context
* @param prefixes
* @throws ActiveMQException
*/
@Override
public void beforeCreateSession(String name, String username, int minLargeMessageSize,
RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge,
boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context,
Map<SimpleString, RoutingType> prefixes) throws ActiveMQException {
if(debug) {
log.info("beforeCreateSession {},{},{},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID(), name, username);
}
String connId = connection.getID().toString();
if(deviceAuthEnabled && !authConnectTables.containsKey(connId)) {
// Debug.print(log);
authConnectTables.put(connId,0);
doConnectValidation(connection);
}
}
/**
* After a session has been created.
*
* @param session The newly created session
* @throws ActiveMQException
*/
@Override
public void afterCreateSession(ServerSession session) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterCreateSession {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID());
}
if(accountAuthEnabled){
doAccountValidation(session.getUsername(),session.getPassword());
}
}
/**
* A connection has been destroyed.
*
* @param connection
* @throws ActiveMQException
*/
@Override
public void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException {
if(debug) {
log.info("afterDestroyConnection {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID());
}
doDisconnect(connection);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}
/**
* After a message is sent
*
* @param session the session that sends the message
* @param tx
* @param message
* @param direct
* @param noAutoCreateQueue
* @param result
* @throws ActiveMQException
*/
@Override
public void afterSend(ServerSession session,
Transaction tx,
Message message,
boolean direct,
boolean noAutoCreateQueue,
RoutingStatus result) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterSend {},{},{}", connection.getClientID(), connection.getRemoteAddress(), connection.getID());
}
doSendLwt(connection,message);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}
private void doConnectValidation(RemotingConnection connection) throws ActiveMQException{
Boolean success = false;
String error = "";
boolean invalid = false;
int locked = 0;
int eft = 0;
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
try{
/**
* 设备授权校验
*/
boolean invalidClientId = invalidClientId(clientId);
if(invalidClientId){//licId@mac
/**
* 无效的ClientId
*/
invalid = true;
throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",clientId));
}
/**
* 连接用户校验
* 有效的ClientId格式
*/
if(!isService(clientId)){
String[] clientIdElements = clientId.split(CLIENT_SPLITOR);
String licId = clientIdElements[0];
String devId = clientIdElements[1];
/**
* 开启设备授权
*/
if(deviceAuthEnabled){
/**
* 新设备
*/
if(!authDeviceTables.containsKey(clientId)){
/**
* 验证设备
*/
DeviceAuth auth = validate(licId,devId);
/**
* 新设备
*/
if(ObjectUtil.isNotEmpty(auth)){
/**
* 设备第一次
*/
if(ObjectUtil.isEmpty(auth.getDevId())){
String lockerSql = deviceAuthLockerSQL;
/**
* 防止一个licId被多台设备使用
*/
locked = deviceJdbcTemplate.update(lockerSql, new Object[]\{devId, licId});
if(locked==0){
throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId));
}
auth.setDevId(devId);
authDeviceTables.put(clientId,auth);
}
}
}
}
/**
* 更新设备状态【连接】
*/
if(deviceStatusSyncabled){
String sql = connectUpdateSQL;
String brokerIp = IPUntil.getLocalIp();
eft = deviceJdbcTemplate.update(sql, new Object[]\{connId,clientIp, brokerIp,licId});
}
}
success = true;
}
catch(ActiveMQException ex){
error = ex.getMessage();
throw ex;
}
finally{
String prefix = "连接异常";
if(success){
prefix = "连接成功";
log.info("{} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId,locked,eft);
}
else{
if(invalid) {
log.debug("{} {} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId,locked,eft, error);
}
else{
log.info("{} {} {} {} {} {} {}", prefix, formatClientId(clientId), clientIp,connId, locked,eft,error);
}
}
}
}
private void doDisconnect(RemotingConnection connection){
/**
* 更新设备状态【断开】
*/
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
int eft = 0;
if(invalidClientId(clientId)){
return;
}
try {
if (clientId.indexOf(CLIENT_SPLITOR) != 1) {
if (deviceStatusSyncabled) {
String[] clientIdElements = clientId.split(CLIENT_SPLITOR);
String licId = clientIdElements[0];
String sql = disconnectUpdateSQL;
eft = deviceJdbcTemplate.update(sql, new Object[]\{licId,connId});
}
}
}
finally {
log.info("断开成功 {} {} {} {}", formatClientId(clientId),clientIp,connId,eft);
}
}
private void doSendLwt(RemotingConnection connection,Message message){
int eft = 0;
Boolean success = false;
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
String address = message.getAddress();
if(invalidClientId(clientId)){
return;
}
if (MISSING_TOPIC_PREFIX.equals(address)) {
if (deviceStatusSyncabled && !isService(clientId)) {
try {
String licId = null;
String sql = lwtUpdateSQL;
String[] clientIdElements = clientId.split(CLIENT_SPLITOR);
licId = clientIdElements[0];
eft = deviceJdbcTemplate.update(sql, new Object[]\{licId, connId});
success = true;
}
finally{
if(success){
log.info("断开成功 {} {} {} {} {}",formatClientId(clientId),clientIp,connId,"LWT" ,eft);
}
else{
log.debug("断开异常 {} {} {} {} {} {}",formatClientId(clientId) ,clientIp,connId,"LWT",eft,message);
}
}
}
}
}
private JdbcTemplate build(String driver, String url, String username, String password){
DataSourceBuilder builder = DataSourceBuilder.create();
builder.driverClassName(driver);
builder.url(url);
builder.username(username);
builder.password(password);
return new JdbcTemplate(builder.build());
}
private String formatClientId(String clientId){
if(ObjectUtil.isNotEmpty(clientId)){
int index = clientId.toLowerCase().indexOf("service-");
if(index!=-1){
int length = clientId.length();
if(length>(index+44)){
return clientId.substring(index,index+44);
}
else{
return clientId.substring(index);
}
}
return clientId;
}
return clientId;
}
private String formatClientIP(String clientIp){
int index = -1;
if(ObjectUtil.isEmpty(clientIp)){
return clientIp;
}
clientIp = clientIp.replace("/","");
return clientIp;
}
private void doAccountValidation(String username,String password){
String sql= accountAuthQuerySQL;
List<Account> accounts = accountJdbcTemplate.query(sql,new Object[]\{username,password},new RowMapperResultSetExtractor<Account>(new BeanPropertyRowMapper<Account>(Account.class)));
if(ObjectUtil.isEmpty(accounts)){
throw new SecurityException("非法租户");
}
}
/**
* @param licId yekerId
* @param devId cpusn or mac_address
*/
private DeviceAuth validate(String licId,String devId) throws ActiveMQException {
/**
* YekerId@CPU_SN;保证一个YekerId只被一台设备使用
*/
String sql = deviceAuthCheckSQL;
List<DeviceAuth> deviceAuths = deviceJdbcTemplate.query(sql, new Object[]\{licId}, new RowMapperResultSetExtractor<DeviceAuth>(new BeanPropertyRowMapper<DeviceAuth>(DeviceAuth.class)));
if (deviceAuths == null || ObjectUtil.isEmpty(deviceAuths)) {
throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",licId));
}
DeviceAuth auth = deviceAuths.get(0);
if (ObjectUtil.isNotEmpty(auth.getDevId())
&& !auth.getDevId().equalsIgnoreCase(devId)) {
throw new MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId));
}
return auth;
}
/**
* [service node或者broker node]
* @param clientId
* @return
*/
private boolean isService(String clientId){
return ObjectUtil.isNotEmpty(clientId)
&& (clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX)!=-1 //服务节点
|| clientId.toLowerCase().indexOf("brokernode")!=-1);// broker节点
}
/**
* clientId是否含有@分割符号
* @param clientId
* @return
*/
private static boolean invalidClientId(String clientId){
return ObjectUtil.isEmpty(clientId)
|| (clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX) == -1 //非服务节点
&& clientId.toLowerCase().indexOf("brokernode") == -1 //非broker节点
&& clientId.indexOf(CLIENT_SPLITOR) == -1);//非正常的client
}
/**
* 最后遗嘱LWT(Last Will & Testament)
* @param originalTopic
* @return
*/
private boolean isLwt(String originalTopic) {
return originalTopic != null && originalTopic.startsWith(MISSING_TOPIC_PREFIX);
}
/**
*
* @param server
*/
@Override
public void registered(ActiveMQServer server){
log.info("{} 插件注册",this.getClass().getSimpleName());
reset();
}
/**
* 插件卸载
* @param server
*/
@Override
public void unregistered(ActiveMQServer server){
log.info("{} 插件注销 {}",this.getClass().getSimpleName());
reset();
}
/**
* 同步因为服务器维护导致相关设备状态不一致
*/
private void reset(){
authConnectTables.clear();
authDeviceTables.clear();
String brokerIp = IPUntil.getLocalIp();
String sql = resetUpdateSQL;
deviceJdbcTemplate.update(sql, new Object[]\{brokerIp});
}
}
```
!image-2022-08-02-08-23-52-965.png!
!image-2022-08-02-08-24-39-288.png!
public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
...
public void act(MqttMessage message) {
try {
switch (message.fixedHeader().messageType()) {
case AUTH:
handleAuth(message);
break;
case CONNECT:
handleConnect((MqttConnectMessage) message);
break;
case PUBLISH:
handlePublish((MqttPublishMessage) message);
break;
case PUBACK:
handlePuback((MqttPubAckMessage) message);
break;
case PUBREC:
handlePubrec(message);
break;
case PUBREL:
handlePubrel(message);
break;
case PUBCOMP:
handlePubcomp(message);
break;
case SUBSCRIBE:
handleSubscribe((MqttSubscribeMessage) message);
break;
case UNSUBSCRIBE:
handleUnsubscribe((MqttUnsubscribeMessage) message);
break;
case DISCONNECT:
disconnect(false, message);
break;
case UNSUBACK:
case SUBACK:
case PINGREQ: // These are actually handled by the Netty thread directly so this packet should never make it here
case PINGRESP:
case CONNACK: // The server does not instantiate connections therefore any CONNACK received over a connection is an invalid control message.
default:
disconnect(true);
}
} catch(MQTTRuntimesException e) {// Customer code throw zhe reason code to zhe mqtt client
logger.info("@@@@@@@@@@@@@@@:"+e.getCode());
session.getProtocolHandler().sendConnack(e.getCode());
disconnect(true);
}
catch (Exception e) {
logger.info("@@@@@@@@@@@@@@@:"+e.getMessage());
MQTTLogger.LOGGER.errorProcessingControlPacket(message.toString(), e);
if (session.getVersion() == MQTTVersion.MQTT_5) {
sendDisconnect(MQTTReasonCodes.IMPLEMENTATION_SPECIFIC_ERROR);
}
disconnect(true);
} finally {
ReferenceCountUtil.release(message);
}
}
...
}
when i use wireshark capture log
!image-2022-08-02-08-31-01-074.png!
> MQTTReasonCodes byte loss of precision,must int type
> ----------------------------------------------------
>
> Key: ARTEMIS-3913
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3913
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: gongping.zhu
> Priority: Major
> Attachments: image-2022-08-02-08-23-52-965.png, image-2022-08-02-08-24-39-288.png, image-2022-08-02-08-31-01-074.png
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)