You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2018/01/17 08:33:56 UTC

[2/6] activemq-artemis git commit: ARTEMIS-1586 Refactor to make more generic

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 75faa97..57865b7 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -154,7 +154,7 @@ public class AMQConsumer {
       }
       addressInfo.setInternal(internalAddress);
       if (isDurable) {
-         queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName));
+         queueName = org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName);
          QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
          if (result.isExists()) {
             // Already exists

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 64d1353..bca7eae 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -27,6 +27,7 @@ import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
@@ -82,6 +83,8 @@ public class AMQSession implements SessionCallback {
 
    private final OpenWireProtocolManager protocolManager;
 
+   private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
+
    public AMQSession(ConnectionInfo connInfo,
                      SessionInfo sessInfo,
                      ActiveMQServer server,
@@ -295,7 +298,7 @@ public class AMQSession implements SessionCallback {
    }
 
    @Override
-   public void disconnect(ServerConsumer consumerId, String queueName) {
+   public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
       // TODO Auto-generated method stub
 
    }
@@ -315,7 +318,7 @@ public class AMQSession implements SessionCallback {
          actualDestinations = new ActiveMQDestination[]{destination};
       }
 
-      org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend);
+      org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools);
 
       originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getClientId());
 
@@ -338,12 +341,12 @@ public class AMQSession implements SessionCallback {
 
       for (int i = 0; i < actualDestinations.length; i++) {
          ActiveMQDestination dest = actualDestinations[i];
-         SimpleString address = new SimpleString(dest.getPhysicalName());
+         SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool());
          org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy();
          coreMsg.setAddress(address);
 
          if (actualDestinations[i].isQueue()) {
-            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
+            checkAutoCreateQueue(SimpleString.toSimpleString(actualDestinations[i].getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool()), actualDestinations[i].isTemporary());
             coreMsg.setRoutingType(RoutingType.ANYCAST);
          } else {
             coreMsg.setRoutingType(RoutingType.MULTICAST);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 18e0b10..0a12b47 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -239,7 +239,7 @@ public class StompSession implements SessionCallback {
    }
 
    @Override
-   public void disconnect(ServerConsumer consumerId, String queueName) {
+   public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
       StompSubscription stompSubscription = subscriptions.remove(consumerId.getID());
       if (stompSubscription != null) {
          StompFrame frame = connection.getFrameHandler().createStompFrame(Stomp.Responses.ERROR);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
index 9133cdf..f343ec9 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
@@ -111,7 +111,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
       // Create the message consumer
       SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector);
       if (activation.isTopic() && spec.isSubscriptionDurable()) {
-         SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, spec.getClientID(), spec.getSubscriptionName()));
+         SimpleString queueName = ActiveMQDestination.createQueueNameForSubscription(true, spec.getClientID(), spec.getSubscriptionName());
 
          QueueQuery subResponse = session.queueQuery(queueName);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index 2276fdb..d38f45f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.artemis.core.protocol;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
@@ -54,7 +53,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
-import org.apache.activemq.artemis.utils.collections.TypedProperties;
 
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@@ -85,34 +83,15 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
 
 public class ServerPacketDecoder extends ClientPacketDecoder {
 
-   private static final int UUID_LENGTH = 36;
-   private static final int DEFAULT_INTERNER_CAPACITY = 32;
    private static final long serialVersionUID = 3348673114388400766L;
-   private SimpleString.Interner keysInterner;
-   private TypedProperties.StringValue.Interner valuesInterner;
-
-   public ServerPacketDecoder() {
-      this.keysInterner = null;
-      this.valuesInterner = null;
-   }
-
-   private void initializeInternersIfNeeded() {
-      if (this.keysInterner == null) {
-         this.keysInterner = new SimpleString.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
-      }
-      if (this.valuesInterner == null) {
-         this.valuesInterner = new TypedProperties.StringValue.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH);
-      }
-   }
 
    private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) {
       final SessionSendMessage sendMessage;
 
-      initializeInternersIfNeeded();
       if (connection.isVersionBeforeAddressChange()) {
-         sendMessage = new SessionSendMessage_1X(new CoreMessage(this.keysInterner, this.valuesInterner));
+         sendMessage = new SessionSendMessage_1X(new CoreMessage(this.coreMessageObjectPools));
       } else {
-         sendMessage = new SessionSendMessage(new CoreMessage(this.keysInterner, this.valuesInterner));
+         sendMessage = new SessionSendMessage(new CoreMessage(this.coreMessageObjectPools));
       }
 
       sendMessage.decode(in);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 8b281eb..f53d028 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.protocol.core.Channel;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler;
@@ -48,6 +49,8 @@ public final class CoreSessionCallback implements SessionCallback {
 
    private ServerSessionPacketHandler handler;
 
+   private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
+
    public CoreSessionCallback(String name,
                               ProtocolManager protocolManager,
                               Channel channel,
@@ -115,9 +118,9 @@ public final class CoreSessionCallback implements SessionCallback {
 
       Packet packet;
       if (channel.getConnection().isVersionBeforeAddressChange()) {
-         packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(), deliveryCount);
+         packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(coreMessageObjectPools), deliveryCount);
       } else {
-         packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount);
+         packet = new SessionReceiveMessage(consumer.getID(), message.toCore(coreMessageObjectPools), deliveryCount);
       }
 
       int size = 0;
@@ -159,11 +162,11 @@ public final class CoreSessionCallback implements SessionCallback {
    }
 
    @Override
-   public void disconnect(ServerConsumer consumerId, String queueName) {
+   public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
       if (channel.supports(PacketImpl.DISCONNECT_CONSUMER)) {
          channel.send(new DisconnectConsumerMessage(consumerId.getID()));
       } else {
-         ActiveMQServerLogger.LOGGER.warnDisconnectOldClient(queueName);
+         ActiveMQServerLogger.LOGGER.warnDisconnectOldClient(queueName.toString());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 5dc1b93..15b1465 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -1045,7 +1045,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
    @Override
    public void disconnect() {
-      callback.disconnect(this, getQueue().getName().toString());
+      callback.disconnect(this, getQueue().getName());
    }
 
    public float getRate() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
index a440e31..2c81343 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java
@@ -18,10 +18,11 @@ package org.apache.activemq.artemis.spi.core.protocol;
 
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 
 public interface MessageConverter<ProtocolMessage extends Message> {
 
-   ICoreMessage toCore(ProtocolMessage pureMessage) throws Exception;
+   ICoreMessage toCore(ProtocolMessage pureMessage, CoreMessageObjectPools coreMessageObjectPools) throws Exception;
 
    ProtocolMessage fromCore(ICoreMessage coreMessage) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index ae1612f..c4a2dbe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -81,7 +81,7 @@ public interface SessionCallback {
 
    void closed();
 
-   void disconnect(ServerConsumer consumerId, String queueName);
+   void disconnect(ServerConsumer consumerId, SimpleString queueName);
 
    boolean isWritable(ReadyListener callback, Object protocolContext);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 2707190..5cfac12 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.Persister;
@@ -334,6 +335,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
 
       @Override
       public CoreMessage toCore() {
+         return toCore(null);
+      }
+
+      @Override
+      public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
          return null;
       }
 
@@ -591,6 +597,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public Message putStringProperty(SimpleString key, String value) {
+         return null;
+      }
+
+      @Override
       public Message putStringProperty(String key, String value) {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index d7c9855..078c397 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -386,6 +387,11 @@ public class AcknowledgeTest extends ActiveMQTestBase {
 
       @Override
       public ICoreMessage toCore() {
+         return toCore(null);
+      }
+
+      @Override
+      public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
          return null;
       }
 
@@ -648,6 +654,11 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       }
 
       @Override
+      public Message putStringProperty(SimpleString key, String value) {
+         return null;
+      }
+
+      @Override
       public Message putStringProperty(String key, String value) {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 2f25480..dc57a12 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -585,7 +585,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
       }
 
       @Override
-      public void disconnect(ServerConsumer consumerId, String queueName) {
+      public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
          //To change body of implemented methods use File | Settings | File Templates.
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
index 790ed82..aaf29b0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
@@ -128,7 +128,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
          msg.putStringProperty("myNonAsciiStringProperty", international.toString());
          msg.putStringProperty("mySpecialCharacters", special);
          msg.putStringProperty(new SimpleString("mySimpleStringProperty"), new SimpleString("mySimpleStringPropertyValue_" + i));
-         msg.putStringProperty(new SimpleString("myNullSimpleStringProperty"), null);
+         msg.putStringProperty(new SimpleString("myNullSimpleStringProperty"), (SimpleString) null);
          producer.send(msg);
       }