You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/11/11 20:00:58 UTC

[1/2] activemq git commit: NO-JIRA Add some additional validation. (cherry picked from commit 097c0e7eae360f376b525448863e86452ebc06cc)

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x 56acbc8ef -> 41ce86bd9


NO-JIRA Add some additional validation.
(cherry picked from commit 097c0e7eae360f376b525448863e86452ebc06cc)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/35bdd137
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/35bdd137
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/35bdd137

Branch: refs/heads/activemq-5.14.x
Commit: 35bdd137248c54bad1f0f9ed5a676de46d95dd11
Parents: 56acbc8
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Nov 10 12:45:37 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Nov 11 14:53:31 2016 -0500

----------------------------------------------------------------------
 .../amqp/interop/AmqpScheduledMessageTest.java        | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/35bdd137/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
index 053ec7e..b91dac0 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.amqp.interop;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -171,6 +172,7 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
         assertEquals(0, brokerService.getAdminView().getQueues().length);
 
         AmqpSender sender = session.createSender("queue://" + getTestName());
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
 
         // Get the Queue View early to avoid racing the delivery.
         assertEquals(1, brokerService.getAdminView().getQueues().length);
@@ -184,20 +186,28 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
         sender.send(message);
         sender.close();
 
+        receiver.flow(1);
+
         // Read the message with short timeout, shouldn't get it.
         try {
-            readMessages(getTestName(), 1, false, 1000);
+            assertNull(receiver.receive(1, TimeUnit.SECONDS));
             fail("Should not read the message");
         } catch (Throwable ex) {
         }
 
         // Read the message with long timeout, should get it.
+        AmqpMessage delivered = null;
         try {
-            readMessages(getTestName(), 1, false, 10000);
+            delivered = receiver.receive(10, TimeUnit.SECONDS);
         } catch (Throwable ex) {
             fail("Should read the message");
         }
 
+        assertNotNull(delivered);
+        Long msgDeliveryTime = (Long) delivered.getMessageAnnotation("x-opt-delivery-delay");
+        assertNotNull(msgDeliveryTime);
+        assertEquals(delay, msgDeliveryTime.longValue());
+
         connection.close();
     }
 


[2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6504

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6504

Round the start time value not truncate to ensure delay falls on the
correct side of the scheduling block.
(cherry picked from commit 980162233fd3693d1f83d3f95985ac33affa7a8f)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/41ce86bd
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/41ce86bd
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/41ce86bd

Branch: refs/heads/activemq-5.14.x
Commit: 41ce86bd954708ecbba52e6550f00c95fe80a506
Parents: 35bdd13
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Nov 11 14:48:49 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Nov 11 14:53:35 2016 -0500

----------------------------------------------------------------------
 .../amqp/interop/AmqpScheduledMessageTest.java  | 17 ++++++++++---
 .../scheduler/memory/InMemoryJobScheduler.java  | 16 ++++++------
 .../kahadb/scheduler/JobSchedulerImpl.java      | 26 ++++++++++----------
 3 files changed, 35 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/41ce86bd/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
index b91dac0..14dcf8c 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.Date;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
@@ -165,6 +166,8 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
 
     @Test
     public void testScheduleWithDelay() throws Exception {
+        final long DELAY = 5000;
+
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = trackConnection(client.connect());
         AmqpSession session = connection.createSession();
@@ -179,9 +182,10 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
         final QueueViewMBean queueView = getProxyToQueue(getTestName());
         assertNotNull(queueView);
 
+        long sendTime = System.currentTimeMillis();
+
         AmqpMessage message = new AmqpMessage();
-        long delay = 5000;
-        message.setMessageAnnotation("x-opt-delivery-delay", delay);
+        message.setMessageAnnotation("x-opt-delivery-delay", DELAY);
         message.setText("Test-Message");
         sender.send(message);
         sender.close();
@@ -203,10 +207,17 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
             fail("Should read the message");
         }
 
+        long receivedTime = System.currentTimeMillis();
+
         assertNotNull(delivered);
         Long msgDeliveryTime = (Long) delivered.getMessageAnnotation("x-opt-delivery-delay");
         assertNotNull(msgDeliveryTime);
-        assertEquals(delay, msgDeliveryTime.longValue());
+        assertEquals(DELAY, msgDeliveryTime.longValue());
+
+        long totalDelay = receivedTime - sendTime;
+        LOG.debug("Sent at: {}, received at: {} ", new Date(sendTime), new Date(receivedTime), totalDelay);
+
+        assertTrue("Delay not as expected: " + totalDelay, receivedTime - sendTime >= DELAY);
 
         connection.close();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/41ce86bd/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java
index bd2aaf5..3e07770 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java
@@ -54,10 +54,10 @@ public class InMemoryJobScheduler implements JobScheduler {
 
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     private final String name;
-    private final TreeMap<Long, ScheduledTask> jobs = new TreeMap<Long, ScheduledTask>();
+    private final TreeMap<Long, ScheduledTask> jobs = new TreeMap<>();
     private final AtomicBoolean started = new AtomicBoolean(false);
     private final AtomicBoolean dispatchEnabled = new AtomicBoolean(false);
-    private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
+    private final List<JobListener> jobListeners = new CopyOnWriteArrayList<>();
     private final Timer timer = new Timer();
 
     public InMemoryJobScheduler(String name) {
@@ -165,7 +165,7 @@ public class InMemoryJobScheduler implements JobScheduler {
 
     @Override
     public List<Job> getNextScheduleJobs() throws Exception {
-        List<Job> result = new ArrayList<Job>();
+        List<Job> result = new ArrayList<>();
         lock.readLock().lock();
         try {
             if (!jobs.isEmpty()) {
@@ -179,7 +179,7 @@ public class InMemoryJobScheduler implements JobScheduler {
 
     @Override
     public List<Job> getAllJobs() throws Exception {
-        final List<Job> result = new ArrayList<Job>();
+        final List<Job> result = new ArrayList<>();
         this.lock.readLock().lock();
         try {
             for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) {
@@ -194,7 +194,7 @@ public class InMemoryJobScheduler implements JobScheduler {
 
     @Override
     public List<Job> getAllJobs(long start, long finish) throws Exception {
-        final List<Job> result = new ArrayList<Job>();
+        final List<Job> result = new ArrayList<>();
         this.lock.readLock().lock();
         try {
             for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) {
@@ -223,7 +223,7 @@ public class InMemoryJobScheduler implements JobScheduler {
         long startTime = System.currentTimeMillis();
         long executionTime = 0;
         // round startTime - so we can schedule more jobs at the same time
-        startTime = (startTime / 1000) * 1000;
+        startTime = ((startTime + 500) / 1000) * 1000;
         if (cronEntry != null && cronEntry.length() > 0) {
             try {
                 executionTime = CronParser.getNextScheduledTime(cronEntry, startTime);
@@ -369,7 +369,7 @@ public class InMemoryJobScheduler implements JobScheduler {
      */
     private class ScheduledTask extends TimerTask {
 
-        private final Map<String, InMemoryJob> jobs = new TreeMap<String, InMemoryJob>();
+        private final Map<String, InMemoryJob> jobs = new TreeMap<>();
         private final long executionTime;
 
         public ScheduledTask(long executionTime) {
@@ -384,7 +384,7 @@ public class InMemoryJobScheduler implements JobScheduler {
          * @return a Collection containing all the managed jobs for this task.
          */
         public Collection<InMemoryJob> getAllJobs() {
-            return new ArrayList<InMemoryJob>(jobs.values());
+            return new ArrayList<>(jobs.values());
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/41ce86bd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
index 82b9ff5..d49acaf 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
@@ -57,7 +57,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
     private BTreeIndex<Long, List<JobLocation>> index;
     private Thread thread;
     private final AtomicBoolean started = new AtomicBoolean(false);
-    private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
+    private final List<JobListener> jobListeners = new CopyOnWriteArrayList<>();
     private static final IdGenerator ID_GENERATOR = new IdGenerator();
     private final ScheduleTime scheduleTime = new ScheduleTime();
 
@@ -132,7 +132,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
 
     @Override
     public List<Job> getNextScheduleJobs() throws IOException {
-        final List<Job> result = new ArrayList<Job>();
+        final List<Job> result = new ArrayList<>();
         this.store.readLockIndex();
         try {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@@ -169,7 +169,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
 
     @Override
     public List<Job> getAllJobs() throws IOException {
-        final List<Job> result = new ArrayList<Job>();
+        final List<Job> result = new ArrayList<>();
         this.store.readLockIndex();
         try {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@@ -198,7 +198,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
 
     @Override
     public List<Job> getAllJobs(final long start, final long finish) throws IOException {
-        final List<Job> result = new ArrayList<Job>();
+        final List<Job> result = new ArrayList<>();
         this.store.readLockIndex();
         try {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@@ -229,7 +229,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
         long startTime = System.currentTimeMillis();
         // round startTime - so we can schedule more jobs
         // at the same time
-        startTime = (startTime / 1000) * 1000;
+        startTime = ((startTime + 500) / 1000) * 1000;
         long time = 0;
         if (cronEntry != null && cronEntry.length() > 0) {
             try {
@@ -329,7 +329,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
             values = this.index.remove(tx, nextExecutionTime);
         }
         if (values == null) {
-            values = new ArrayList<JobLocation>();
+            values = new ArrayList<>();
         }
 
         // There can never be more than one instance of the same JobId scheduled at any
@@ -407,7 +407,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
                 target = this.index.remove(tx, command.getNextExecutionTime());
             }
             if (target == null) {
-                target = new ArrayList<JobLocation>();
+                target = new ArrayList<>();
             }
             target.add(result);
 
@@ -568,7 +568,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
      * @throws IOException if an error occurs during the remove operation.
      */
     protected void removeInRange(Transaction tx, long start, long finish, Location location) throws IOException {
-        List<Long> keys = new ArrayList<Long>();
+        List<Long> keys = new ArrayList<>();
         for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx, start); i.hasNext();) {
             Map.Entry<Long, List<JobLocation>> entry = i.next();
             if (entry.getKey().longValue() <= finish) {
@@ -662,7 +662,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
      * @throws IOException if an error occurs walking the scheduler tree.
      */
     protected List<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException {
-        List<JobLocation> references = new ArrayList<JobLocation>();
+        List<JobLocation> references = new ArrayList<>();
 
         for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
             Map.Entry<Long, List<JobLocation>> entry = i.next();
@@ -709,8 +709,8 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
                 // needed before firing the job event.
                 Map.Entry<Long, List<JobLocation>> first = getNextToSchedule();
                 if (first != null) {
-                    List<JobLocation> list = new ArrayList<JobLocation>(first.getValue());
-                    List<JobLocation> toRemove = new ArrayList<JobLocation>(list.size());
+                    List<JobLocation> list = new ArrayList<>(first.getValue());
+                    List<JobLocation> toRemove = new ArrayList<>(list.size());
                     final long executionTime = first.getKey();
                     long nextExecutionTime = 0;
                     if (executionTime <= currentTime) {
@@ -852,7 +852,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
     }
 
     void createIndexes(Transaction tx) throws IOException {
-        this.index = new BTreeIndex<Long, List<JobLocation>>(this.store.getPageFile(), tx.allocate().getPageId());
+        this.index = new BTreeIndex<>(this.store.getPageFile(), tx.allocate().getPageId());
     }
 
     void load(Transaction tx) throws IOException {
@@ -863,7 +863,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
 
     void read(DataInput in) throws IOException {
         this.name = in.readUTF();
-        this.index = new BTreeIndex<Long, List<JobLocation>>(this.store.getPageFile(), in.readLong());
+        this.index = new BTreeIndex<>(this.store.getPageFile(), in.readLong());
         this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
         this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE);
     }