You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2009/02/26 00:21:02 UTC

svn commit: r747962 - in /qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid: client/AMQConnection.java client/BasicMessageProducer.java client/BasicMessageProducer_0_10.java client/configuration/ClientProperties.java jms/ConnectionURL.java

Author: rajith
Date: Wed Feb 25 23:21:01 2009
New Revision: 747962

URL: http://svn.apache.org/viewvc?rev=747962&view=rev
Log:
This is related to QPID-1106 and QPID-1677

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=747962&r1=747961&r2=747962&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Wed Feb 25 23:21:01 2009
@@ -268,6 +268,13 @@
     //Indicates whether persistent messages are synchronized
     private boolean _syncPersistence;
 
+    //Indicates whether we need to sync on every message ack
+    private boolean _syncAck;
+    
+    //Indicates the sync publish options (persistent|all)
+    //By default it's async publish
+    private String _syncPublish = ""; 
+    
     /**
      * @param broker      brokerdetails
      * @param username    username
@@ -348,25 +355,53 @@
     public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
     {
         // set this connection maxPrefetch
-        if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null)
+        if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
         {
-            _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
+            _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH));
         }
         else
         {
             // use the defaul value set for all connections
             _maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
-                                                                               ClientProperties.MAX_PREFETCH_DEFAULT));
+                    ClientProperties.MAX_PREFETCH_DEFAULT));
         }
 
-        if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null)
+        if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null)
         {
-            _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE));
+            _syncPersistence = 
+                Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE));
+            _logger.warn("sync_persistence is a deprecated property, " +
+            		"please use sync_publish={persistent|all} instead");
         }
         else
         {
             // use the defaul value set for all connections
             _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
+            if (_syncPersistence)
+            {
+                _logger.warn("sync_persistence is a deprecated property, " +
+                        "please use sync_publish={persistent|all} instead");
+            }
+        }
+
+        if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null)
+        {
+            _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK));
+        }
+        else
+        {
+            // use the defaul value set for all connections
+            _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME);
+        }
+
+        if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null)
+        {
+            _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH);
+        }
+        else
+        {
+            // use the defaul value set for all connections
+            _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish);
         }
         
         _failoverPolicy = new FailoverPolicy(connectionURL, this);
@@ -1469,6 +1504,19 @@
         return _syncPersistence;
     }
     
+    /**
+     * Indicates whether we need to sync on every message ack
+     */
+    public boolean getSyncAck()
+    {
+        return _syncAck;
+    }
+    
+    public String getSyncPublish()
+    {
+        return _syncPublish;
+    }
+        
     public void setIdleTimeout(long l)
     {
         _delegate.setIdleTimeout(l);

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=747962&r1=747961&r2=747962&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Wed Feb 25 23:21:01 2009
@@ -46,6 +46,8 @@
 
 public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
 {
+    enum PublishMode { ASYNC_PUBLISH_ALLL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; 
+    
     protected final Logger _logger = LoggerFactory.getLogger(getClass());
 
     private AMQConnection _connection;
@@ -120,6 +122,8 @@
     protected String _userID;  // ref user id used in the connection.
 
     private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
+    
+    protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALLL;
 
     protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
                                    AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
@@ -141,6 +145,26 @@
         _mandatory = mandatory;
         _waitUntilSent = waitUntilSent;
         _userID = connection.getUsername();
+        setPublishMode();        
+    }
+    
+    void setPublishMode()
+    {
+        // Publish mode could be configured at destination level as well.
+        // Will add support for this when we provide a more robust binding URL
+        
+        String syncPub = _connection.getSyncPublish();
+        // Support for deprecated option sync_persistence
+        if (syncPub.equals("persistent") || _connection.getSyncPersistence())
+        {
+            publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT;
+        }
+        else if (syncPub.equals("all"))
+        {
+            publishMode = PublishMode.SYNC_PUBLISH_ALL;
+        }
+        
+        _logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode);
     }
 
     void resubscribe() throws AMQException

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=747962&r1=747961&r2=747962&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Wed Feb 25 23:21:01 2009
@@ -151,9 +151,13 @@
                 ((AMQSession_0_10) getSession()).getQpidSession();
 
             // if true, we need to sync the delivery of this message
-            boolean sync = (deliveryMode == DeliveryMode.PERSISTENT &&
-                            getSession().getAMQConnection().getSyncPersistence());
+            boolean sync = false;
 
+            sync = ( (publishMode == PublishMode.SYNC_PUBLISH_ALL) ||
+                     (publishMode == PublishMode.SYNC_PUBLISH_PERSISTENT && 
+                         deliveryMode == DeliveryMode.PERSISTENT)
+                   );  
+            
             org.apache.mina.common.ByteBuffer data = message.getData();
             ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
             

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java?rev=747962&r1=747961&r2=747962&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java Wed Feb 25 23:21:01 2009
@@ -47,6 +47,19 @@
      */
     public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence";
     
+    /**
+     * When true a sync command is sent after sending a message ack.
+     * type: boolean
+     */
+    public static final String SYNC_ACK_PROP_NAME = "sync_ack";
+        
+    /**
+     * sync_publish property - {persistent|all}
+     * If set to 'persistent',then persistent messages will be publish synchronously
+     * If set to 'all', then all messages regardless of the delivery mode will be
+     * published synchronously.
+     */
+    public static final String SYNC_PUBLISH_PROP_NAME = "sync_publish";
     
     /**
      * This value will be used in the following settings

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=747962&r1=747961&r2=747962&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Wed Feb 25 23:21:01 2009
@@ -33,9 +33,11 @@
   */
 public interface ConnectionURL
 {
-    public static final String AMQ_SYNC_PERSISTENCE = "sync_persistence";
-    public static final String AMQ_MAXPREFETCH = "maxprefetch";
     public static final String AMQ_PROTOCOL = "amqp";
+    public static final String OPTIONS_SYNC_PERSISTENCE = "sync_persistence";
+    public static final String OPTIONS_MAXPREFETCH = "maxprefetch";
+    public static final String OPTIONS_SYNC_ACK = "sync_ack";    
+    public static final String OPTIONS_SYNC_PUBLISH = "sync_publish";
     public static final String OPTIONS_BROKERLIST = "brokerlist";
     public static final String OPTIONS_FAILOVER = "failover";
     public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org