You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/06/06 23:47:24 UTC

svn commit: r1490454 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTxTest.java

Author: tabish
Date: Thu Jun  6 21:47:24 2013
New Revision: 1490454

URL: http://svn.apache.org/r1490454
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4337

Adds basic TX support for Messages that are to be scheduled. 

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTxTest.java   (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=1490454&r1=1490453&r2=1490454&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java Thu Jun  6 21:47:24 2013
@@ -34,6 +34,7 @@ import org.apache.activemq.command.Produ
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.usage.JobSchedulerUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ByteSequence;
@@ -96,15 +97,13 @@ public class SchedulerBroker extends Bro
     }
 
     @Override
-    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
-        long delay = 0;
-        long period = 0;
-        int repeat = 0;
-        String cronEntry = "";
-        String jobId = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
-        Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
-        Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
-        Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
+    public void send(ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception {
+        ConnectionContext context = producerExchange.getConnectionContext();
+
+        final String jobId = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
+        final Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
+        final Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
+        final Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
 
         String physicalName = messageSend.getDestination().getPhysicalName();
         boolean schedularManage = physicalName.regionMatches(true, 0, ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
@@ -155,31 +154,50 @@ public class SchedulerBroker extends Bro
             }
 
         } else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {
-            // clear transaction context
-            Message msg = messageSend.copy();
-            msg.setTransactionId(null);
-            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg);
-            if (cronValue != null) {
-                cronEntry = cronValue.toString();
-            }
-            if (periodValue != null) {
-                period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
-            }
-            if (delayValue != null) {
-                delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
-            }
-            Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
-            if (repeatValue != null) {
-                repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
-            }
-            getInternalScheduler().schedule(msg.getMessageId().toString(), new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay,
-                period, repeat);
 
+            if (context.isInTransaction()) {
+                context.getTransaction().addSynchronization(new Synchronization() {
+                    @Override
+                    public void afterCommit() throws Exception {
+                        doSchedule(messageSend, cronValue, periodValue, delayValue);
+                    }
+                });
+            } else {
+                doSchedule(messageSend, cronValue, periodValue, delayValue);
+            }
         } else {
             super.send(producerExchange, messageSend);
         }
     }
 
+    private void doSchedule(Message messageSend, Object cronValue, Object periodValue, Object delayValue) throws Exception {
+        long delay = 0;
+        long period = 0;
+        int repeat = 0;
+        String cronEntry = "";
+
+        // clear transaction context
+        Message msg = messageSend.copy();
+        msg.setTransactionId(null);
+        org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg);
+        if (cronValue != null) {
+            cronEntry = cronValue.toString();
+        }
+        if (periodValue != null) {
+            period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
+        }
+        if (delayValue != null) {
+            delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
+        }
+        Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
+        if (repeatValue != null) {
+            repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
+        }
+
+        getInternalScheduler().schedule(msg.getMessageId().toString(),
+            new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay, period, repeat);
+    }
+
     @Override
     public void scheduledJob(String id, ByteSequence job) {
         org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength());

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTxTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTxTest.java?rev=1490454&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTxTest.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTxTest.java Thu Jun  6 21:47:24 2013
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class JobSchedulerTxTest {
+
+    private BrokerService broker;
+    private final String connectionUri = "vm://localhost";
+    private final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
+    private final ActiveMQQueue destination = new ActiveMQQueue("Target.Queue");
+
+    @Before
+    public void setUp() throws Exception {
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Test
+    public void testTxSendWithRollback() throws Exception {
+        final int COUNT = 10;
+        Connection connection = createConnection();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(destination);
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        connection.start();
+        long time = 5000;
+        Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = producerSession.createProducer(destination);
+
+        for (int i = 0; i < COUNT; ++i) {
+            TextMessage message = session.createTextMessage("test msg");
+            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+            producer.send(message);
+        }
+        producer.close();
+        producerSession.rollback();
+
+        // make sure the message isn't delivered early
+        Thread.sleep(2000);
+        assertEquals(COUNT, latch.getCount());
+        latch.await(5, TimeUnit.SECONDS);
+        assertEquals(COUNT, latch.getCount());
+    }
+
+    @Test
+    public void testTxSendWithCommit() throws Exception {
+        final int COUNT = 10;
+        Connection connection = createConnection();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(destination);
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        connection.start();
+        long time = 5000;
+        Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = producerSession.createProducer(destination);
+
+        for (int i = 0; i < COUNT; ++i) {
+            TextMessage message = session.createTextMessage("test msg");
+            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+            producer.send(message);
+        }
+        producer.close();
+        producerSession.commit();
+
+        // make sure the message isn't delivered early
+        Thread.sleep(2000);
+        assertEquals(COUNT, latch.getCount());
+        latch.await(5, TimeUnit.SECONDS);
+        assertEquals(0, latch.getCount());
+    }
+
+    protected Connection createConnection() throws Exception {
+        return cf.createConnection();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return createBroker(true);
+    }
+
+    protected BrokerService createBroker(boolean delete) throws Exception {
+        File schedulerDirectory = new File("target/scheduler");
+        if (delete) {
+            IOHelper.mkdirs(schedulerDirectory);
+            IOHelper.deleteChildren(schedulerDirectory);
+        }
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(true);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.setDataDirectory("target");
+        answer.setSchedulerDirectoryFile(schedulerDirectory);
+        answer.setSchedulerSupport(true);
+        answer.setUseJmx(false);
+        answer.addConnector(connectionUri);
+        return answer;
+    }
+}

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTxTest.java
------------------------------------------------------------------------------
    svn:eol-style = native