You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "AWETTT (JIRA)" <ji...@apache.org> on 2017/12/07 12:32:00 UTC

[jira] [Updated] (ARTEMIS-1542) AMQPMessage can throw / not check for null

     [ https://issues.apache.org/jira/browse/ARTEMIS-1542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

AWETTT updated ARTEMIS-1542:
----------------------------
    Description: 
I built active/active cluster with two brokers

A message was sent to a queue using AMQP 1.0 to broker1. A client at broker2 to read it.

But broker2 throws an exception and the message is not sent to the client:

09:34:53,146 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception: java.lang.NullPointerException
        at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:612) [artemis-amqp-protocol-2.4.0.jar:]
        at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:63) [artemis-amqp-protocol-2.4.0.jar:]
        at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1368) [artemis-server-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1311) [artemis-server-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1304) [artemis-server-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690) [artemis-server-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290) [artemis-server-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) [artemis-commons-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.4.0.jar:2.4.0]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151]
        at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]


The queue is not created on the fly but configured in the brokers, both brokers have this config:

  <queues>
       <queue name="awe.test.queue">
          <address>awe.test.queue</address>
          <durable>true</durable>
        </queue>
  </queues>


The cluster configuration is pretty much copied from the examples:

      <!-- Clustering configuration -->
      <broadcast-groups>
         <broadcast-group name="my-broadcast-group">
            <group-address>${udp-address:231.7.7.7}</group-address>
            <group-port>9876</group-port>
            <broadcast-period>100</broadcast-period>
            <connector-ref>netty-connector</connector-ref>
         </broadcast-group>
      </broadcast-groups>

      <discovery-groups>
         <discovery-group name="my-discovery-group">
            <group-address>${udp-address:231.7.7.7}</group-address>
            <group-port>9876</group-port>
            <refresh-timeout>10000</refresh-timeout>
         </discovery-group>
      </discovery-groups>

      <cluster-connections>
         <cluster-connection name="my-cluster">
            <connector-ref>netty-connector</connector-ref>
            <retry-interval>500</retry-interval>
            <use-duplicate-detection>true</use-duplicate-detection>
            <message-load-balancing>STRICT</message-load-balancing>
            <max-hops>1</max-hops>
            <discovery-group-ref discovery-group-name="my-discovery-group"/>
         </cluster-connection>
      </cluster-connections>

      <!-- a colocated server that will allow shared store full backups to be requested-->
      <ha-policy>
         <shared-store>
            <colocated>
               <backup-port-offset>100</backup-port-offset>
               <backup-request-retries>-1</backup-request-retries>
               <backup-request-retry-interval>2000</backup-request-retry-interval>
               <max-backups>1</max-backups>
               <request-backup>true</request-backup>
               <master>
                  <failover-on-shutdown>true</failover-on-shutdown>
               </master>
               <slave>
                  <failover-on-shutdown>true</failover-on-shutdown>
               </slave>
            </colocated>
         </shared-store>
      </ha-policy>


*qpid-send and qpid-receive were used as test clients. Sending it is done like:*

qpid-send -b localhost:9800 -a awe.test.queue '--connection-option={protocol:amqp1.0}' --content-string 'test message Do 7. Dez 09:31:51 CET 2017' --durable=yes

+Grabbed the message from the logfile. It looks like this:+
0 -> @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x00\x00\x00\x00", message-format=0, settled=false, more=false] 
(79) "\x00Sp\xc0\x04\x02AP\x00\x00St\xc1\x14\x04\xa1\x02snR\x01\xa1\x02ts\x81\x14\xfd\xf8\xcb\xdaG\x94U\x00Sw\xa1(test message Do 7. Dez 09:31:51 CET 2017"

*Reading is done using:*

qpid-receive -b localhost:9802 -a awe.test.queue '--connection-option={protocol:amqp1.0}' -t --timeout 500



I can see, that AMQPMessage.getAddress() can return null:

   @Override
   public String getAddress() {
      if (address == null) {
         Properties properties = getProtonMessage().getProperties();
         if (properties != null) {
            return properties.getTo();
         } else {
           * return null;*
         }
      } else {
         return address;
      }
}


But ServerSessionImpl.send() does not check for null, but it should. This is because SimpleString.toSimpleString() can return null if the input is null:


      SimpleString address = message.getAddressSimpleString();

      if (defaultAddress == null && address != null) {
         defaultAddress = address;
      }

   *   if (address == null) {
         // We don't want to force a re-encode when the message gets sent to the consumer
         message.setAddress(defaultAddress);
      }*



SimpleString:

   public static SimpleString toSimpleString(final String string) {
  *    if (string == null) {
         return null;
      }*
      return new SimpleString(string);
}






  was:
I built active/active cluster with two brokers

A message was sent to a queue using* AMQP 1.0 to broker1. A client at broker2 to read it.*

But broker2 throws an exception and the message is not sent to the client:

09:34:53,146 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception: java.lang.NullPointerException
        at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:612) [artemis-amqp-protocol-2.4.0.jar:]
        at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:63) [artemis-amqp-protocol-2.4.0.jar:]
        at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1368) [artemis-server-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1311) [artemis-server-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1304) [artemis-server-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690) [artemis-server-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290) [artemis-server-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) [artemis-commons-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.4.0.jar:2.4.0]
        at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.4.0.jar:2.4.0]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151]
        at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]


The queue is not created on the fly but configured in the brokers, both brokers have this config:

  <queues>
       <queue name="awe.test.queue">
          <address>awe.test.queue</address>
          <durable>true</durable>
        </queue>
  </queues>


The cluster configuration is pretty much copied from the examples:

      <!-- Clustering configuration -->
      <broadcast-groups>
         <broadcast-group name="my-broadcast-group">
            <group-address>${udp-address:231.7.7.7}</group-address>
            <group-port>9876</group-port>
            <broadcast-period>100</broadcast-period>
            <connector-ref>netty-connector</connector-ref>
         </broadcast-group>
      </broadcast-groups>

      <discovery-groups>
         <discovery-group name="my-discovery-group">
            <group-address>${udp-address:231.7.7.7}</group-address>
            <group-port>9876</group-port>
            <refresh-timeout>10000</refresh-timeout>
         </discovery-group>
      </discovery-groups>

      <cluster-connections>
         <cluster-connection name="my-cluster">
            <connector-ref>netty-connector</connector-ref>
            <retry-interval>500</retry-interval>
            <use-duplicate-detection>true</use-duplicate-detection>
            <message-load-balancing>STRICT</message-load-balancing>
            <max-hops>1</max-hops>
            <discovery-group-ref discovery-group-name="my-discovery-group"/>
         </cluster-connection>
      </cluster-connections>

      <!-- a colocated server that will allow shared store full backups to be requested-->
      <ha-policy>
         <shared-store>
            <colocated>
               <backup-port-offset>100</backup-port-offset>
               <backup-request-retries>-1</backup-request-retries>
               <backup-request-retry-interval>2000</backup-request-retry-interval>
               <max-backups>1</max-backups>
               <request-backup>true</request-backup>
               <master>
                  <failover-on-shutdown>true</failover-on-shutdown>
               </master>
               <slave>
                  <failover-on-shutdown>true</failover-on-shutdown>
               </slave>
            </colocated>
         </shared-store>
      </ha-policy>


*qpid-send and qpid-receive were used as test clients. Sending it is done like:*

qpid-send -b localhost:9800 -a awe.test.queue '--connection-option={protocol:amqp1.0}' --content-string 'test message Do 7. Dez 09:31:51 CET 2017' --durable=yes

+Grabbed the message from the logfile. It looks like this:+
0 -> @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x00\x00\x00\x00", message-format=0, settled=false, more=false] 
(79) "\x00Sp\xc0\x04\x02AP\x00\x00St\xc1\x14\x04\xa1\x02snR\x01\xa1\x02ts\x81\x14\xfd\xf8\xcb\xdaG\x94U\x00Sw\xa1(test message Do 7. Dez 09:31:51 CET 2017"

*Reading is done using:*

qpid-receive -b localhost:9802 -a awe.test.queue '--connection-option={protocol:amqp1.0}' -t --timeout 500



I can see, that AMQPMessage.getAddress() can return null:

   @Override
   public String getAddress() {
      if (address == null) {
         Properties properties = getProtonMessage().getProperties();
         if (properties != null) {
            return properties.getTo();
         } else {
           * return null;*
         }
      } else {
         return address;
      }
}


But ServerSessionImpl.send() does not check for null, but it should. This is because SimpleString.toSimpleString() can return null if the input is null:


      SimpleString address = message.getAddressSimpleString();

      if (defaultAddress == null && address != null) {
         defaultAddress = address;
      }

   *   if (address == null) {
         // We don't want to force a re-encode when the message gets sent to the consumer
         message.setAddress(defaultAddress);
      }*



SimpleString:

   public static SimpleString toSimpleString(final String string) {
  *    if (string == null) {
         return null;
      }*
      return new SimpleString(string);
}







> AMQPMessage can throw / not check for null
> ------------------------------------------
>
>                 Key: ARTEMIS-1542
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-1542
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: AMQP, Broker
>    Affects Versions: 2.4.0
>         Environment: Linux Ubuntu 16.04 LTS
> ActiveMQ Artemis 2.4.0
> Tested with qpid-python clients from qpid-cpp-1.36.0 package
>            Reporter: AWETTT
>
> I built active/active cluster with two brokers
> A message was sent to a queue using AMQP 1.0 to broker1. A client at broker2 to read it.
> But broker2 throws an exception and the message is not sent to the client:
> 09:34:53,146 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception: java.lang.NullPointerException
>         at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:612) [artemis-amqp-protocol-2.4.0.jar:]
>         at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:63) [artemis-amqp-protocol-2.4.0.jar:]
>         at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1368) [artemis-server-2.4.0.jar:2.4.0]
>         at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1311) [artemis-server-2.4.0.jar:2.4.0]
>         at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1304) [artemis-server-2.4.0.jar:2.4.0]
>         at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690) [artemis-server-2.4.0.jar:2.4.0]
>         at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290) [artemis-server-2.4.0.jar:2.4.0]
>         at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) [artemis-commons-2.4.0.jar:2.4.0]
>         at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.4.0.jar:2.4.0]
>         at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.4.0.jar:2.4.0]
>         at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.4.0.jar:2.4.0]
>         at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.4.0.jar:2.4.0]
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151]
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151]
>         at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]
> The queue is not created on the fly but configured in the brokers, both brokers have this config:
>   <queues>
>        <queue name="awe.test.queue">
>           <address>awe.test.queue</address>
>           <durable>true</durable>
>         </queue>
>   </queues>
> The cluster configuration is pretty much copied from the examples:
>       <!-- Clustering configuration -->
>       <broadcast-groups>
>          <broadcast-group name="my-broadcast-group">
>             <group-address>${udp-address:231.7.7.7}</group-address>
>             <group-port>9876</group-port>
>             <broadcast-period>100</broadcast-period>
>             <connector-ref>netty-connector</connector-ref>
>          </broadcast-group>
>       </broadcast-groups>
>       <discovery-groups>
>          <discovery-group name="my-discovery-group">
>             <group-address>${udp-address:231.7.7.7}</group-address>
>             <group-port>9876</group-port>
>             <refresh-timeout>10000</refresh-timeout>
>          </discovery-group>
>       </discovery-groups>
>       <cluster-connections>
>          <cluster-connection name="my-cluster">
>             <connector-ref>netty-connector</connector-ref>
>             <retry-interval>500</retry-interval>
>             <use-duplicate-detection>true</use-duplicate-detection>
>             <message-load-balancing>STRICT</message-load-balancing>
>             <max-hops>1</max-hops>
>             <discovery-group-ref discovery-group-name="my-discovery-group"/>
>          </cluster-connection>
>       </cluster-connections>
>       <!-- a colocated server that will allow shared store full backups to be requested-->
>       <ha-policy>
>          <shared-store>
>             <colocated>
>                <backup-port-offset>100</backup-port-offset>
>                <backup-request-retries>-1</backup-request-retries>
>                <backup-request-retry-interval>2000</backup-request-retry-interval>
>                <max-backups>1</max-backups>
>                <request-backup>true</request-backup>
>                <master>
>                   <failover-on-shutdown>true</failover-on-shutdown>
>                </master>
>                <slave>
>                   <failover-on-shutdown>true</failover-on-shutdown>
>                </slave>
>             </colocated>
>          </shared-store>
>       </ha-policy>
> *qpid-send and qpid-receive were used as test clients. Sending it is done like:*
> qpid-send -b localhost:9800 -a awe.test.queue '--connection-option={protocol:amqp1.0}' --content-string 'test message Do 7. Dez 09:31:51 CET 2017' --durable=yes
> +Grabbed the message from the logfile. It looks like this:+
> 0 -> @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x00\x00\x00\x00", message-format=0, settled=false, more=false] 
> (79) "\x00Sp\xc0\x04\x02AP\x00\x00St\xc1\x14\x04\xa1\x02snR\x01\xa1\x02ts\x81\x14\xfd\xf8\xcb\xdaG\x94U\x00Sw\xa1(test message Do 7. Dez 09:31:51 CET 2017"
> *Reading is done using:*
> qpid-receive -b localhost:9802 -a awe.test.queue '--connection-option={protocol:amqp1.0}' -t --timeout 500
> I can see, that AMQPMessage.getAddress() can return null:
>    @Override
>    public String getAddress() {
>       if (address == null) {
>          Properties properties = getProtonMessage().getProperties();
>          if (properties != null) {
>             return properties.getTo();
>          } else {
>            * return null;*
>          }
>       } else {
>          return address;
>       }
> }
> But ServerSessionImpl.send() does not check for null, but it should. This is because SimpleString.toSimpleString() can return null if the input is null:
>       SimpleString address = message.getAddressSimpleString();
>       if (defaultAddress == null && address != null) {
>          defaultAddress = address;
>       }
>    *   if (address == null) {
>          // We don't want to force a re-encode when the message gets sent to the consumer
>          message.setAddress(defaultAddress);
>       }*
> SimpleString:
>    public static SimpleString toSimpleString(final String string) {
>   *    if (string == null) {
>          return null;
>       }*
>       return new SimpleString(string);
> }



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)