You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/11/11 19:49:19 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6504
Repository: activemq
Updated Branches:
refs/heads/master 9d6bc3a5d -> 980162233
https://issues.apache.org/jira/browse/AMQ-6504
Round the start time value not truncate to ensure delay falls on the
correct side of the scheduling block.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/98016223
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/98016223
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/98016223
Branch: refs/heads/master
Commit: 980162233fd3693d1f83d3f95985ac33affa7a8f
Parents: 9d6bc3a
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Nov 11 14:48:49 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Nov 11 14:48:49 2016 -0500
----------------------------------------------------------------------
.../amqp/interop/AmqpScheduledMessageTest.java | 17 ++++++++++---
.../scheduler/memory/InMemoryJobScheduler.java | 16 ++++++------
.../kahadb/scheduler/JobSchedulerImpl.java | 26 ++++++++++----------
3 files changed, 35 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/98016223/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
index b91dac0..14dcf8c 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpScheduledMessageTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.util.Date;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
@@ -165,6 +166,8 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
@Test
public void testScheduleWithDelay() throws Exception {
+ final long DELAY = 5000;
+
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
@@ -179,9 +182,10 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
+ long sendTime = System.currentTimeMillis();
+
AmqpMessage message = new AmqpMessage();
- long delay = 5000;
- message.setMessageAnnotation("x-opt-delivery-delay", delay);
+ message.setMessageAnnotation("x-opt-delivery-delay", DELAY);
message.setText("Test-Message");
sender.send(message);
sender.close();
@@ -203,10 +207,17 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
fail("Should read the message");
}
+ long receivedTime = System.currentTimeMillis();
+
assertNotNull(delivered);
Long msgDeliveryTime = (Long) delivered.getMessageAnnotation("x-opt-delivery-delay");
assertNotNull(msgDeliveryTime);
- assertEquals(delay, msgDeliveryTime.longValue());
+ assertEquals(DELAY, msgDeliveryTime.longValue());
+
+ long totalDelay = receivedTime - sendTime;
+ LOG.debug("Sent at: {}, received at: {} ", new Date(sendTime), new Date(receivedTime), totalDelay);
+
+ assertTrue("Delay not as expected: " + totalDelay, receivedTime - sendTime >= DELAY);
connection.close();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/98016223/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java
index bd2aaf5..3e07770 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobScheduler.java
@@ -54,10 +54,10 @@ public class InMemoryJobScheduler implements JobScheduler {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final String name;
- private final TreeMap<Long, ScheduledTask> jobs = new TreeMap<Long, ScheduledTask>();
+ private final TreeMap<Long, ScheduledTask> jobs = new TreeMap<>();
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean dispatchEnabled = new AtomicBoolean(false);
- private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
+ private final List<JobListener> jobListeners = new CopyOnWriteArrayList<>();
private final Timer timer = new Timer();
public InMemoryJobScheduler(String name) {
@@ -165,7 +165,7 @@ public class InMemoryJobScheduler implements JobScheduler {
@Override
public List<Job> getNextScheduleJobs() throws Exception {
- List<Job> result = new ArrayList<Job>();
+ List<Job> result = new ArrayList<>();
lock.readLock().lock();
try {
if (!jobs.isEmpty()) {
@@ -179,7 +179,7 @@ public class InMemoryJobScheduler implements JobScheduler {
@Override
public List<Job> getAllJobs() throws Exception {
- final List<Job> result = new ArrayList<Job>();
+ final List<Job> result = new ArrayList<>();
this.lock.readLock().lock();
try {
for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) {
@@ -194,7 +194,7 @@ public class InMemoryJobScheduler implements JobScheduler {
@Override
public List<Job> getAllJobs(long start, long finish) throws Exception {
- final List<Job> result = new ArrayList<Job>();
+ final List<Job> result = new ArrayList<>();
this.lock.readLock().lock();
try {
for (Map.Entry<Long, ScheduledTask> entry : jobs.entrySet()) {
@@ -223,7 +223,7 @@ public class InMemoryJobScheduler implements JobScheduler {
long startTime = System.currentTimeMillis();
long executionTime = 0;
// round startTime - so we can schedule more jobs at the same time
- startTime = (startTime / 1000) * 1000;
+ startTime = ((startTime + 500) / 1000) * 1000;
if (cronEntry != null && cronEntry.length() > 0) {
try {
executionTime = CronParser.getNextScheduledTime(cronEntry, startTime);
@@ -369,7 +369,7 @@ public class InMemoryJobScheduler implements JobScheduler {
*/
private class ScheduledTask extends TimerTask {
- private final Map<String, InMemoryJob> jobs = new TreeMap<String, InMemoryJob>();
+ private final Map<String, InMemoryJob> jobs = new TreeMap<>();
private final long executionTime;
public ScheduledTask(long executionTime) {
@@ -384,7 +384,7 @@ public class InMemoryJobScheduler implements JobScheduler {
* @return a Collection containing all the managed jobs for this task.
*/
public Collection<InMemoryJob> getAllJobs() {
- return new ArrayList<InMemoryJob>(jobs.values());
+ return new ArrayList<>(jobs.values());
}
/**
http://git-wip-us.apache.org/repos/asf/activemq/blob/98016223/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
index 82b9ff5..d49acaf 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
@@ -57,7 +57,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
private BTreeIndex<Long, List<JobLocation>> index;
private Thread thread;
private final AtomicBoolean started = new AtomicBoolean(false);
- private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
+ private final List<JobListener> jobListeners = new CopyOnWriteArrayList<>();
private static final IdGenerator ID_GENERATOR = new IdGenerator();
private final ScheduleTime scheduleTime = new ScheduleTime();
@@ -132,7 +132,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
@Override
public List<Job> getNextScheduleJobs() throws IOException {
- final List<Job> result = new ArrayList<Job>();
+ final List<Job> result = new ArrayList<>();
this.store.readLockIndex();
try {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@@ -169,7 +169,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
@Override
public List<Job> getAllJobs() throws IOException {
- final List<Job> result = new ArrayList<Job>();
+ final List<Job> result = new ArrayList<>();
this.store.readLockIndex();
try {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@@ -198,7 +198,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
@Override
public List<Job> getAllJobs(final long start, final long finish) throws IOException {
- final List<Job> result = new ArrayList<Job>();
+ final List<Job> result = new ArrayList<>();
this.store.readLockIndex();
try {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@@ -229,7 +229,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
long startTime = System.currentTimeMillis();
// round startTime - so we can schedule more jobs
// at the same time
- startTime = (startTime / 1000) * 1000;
+ startTime = ((startTime + 500) / 1000) * 1000;
long time = 0;
if (cronEntry != null && cronEntry.length() > 0) {
try {
@@ -329,7 +329,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
values = this.index.remove(tx, nextExecutionTime);
}
if (values == null) {
- values = new ArrayList<JobLocation>();
+ values = new ArrayList<>();
}
// There can never be more than one instance of the same JobId scheduled at any
@@ -407,7 +407,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
target = this.index.remove(tx, command.getNextExecutionTime());
}
if (target == null) {
- target = new ArrayList<JobLocation>();
+ target = new ArrayList<>();
}
target.add(result);
@@ -568,7 +568,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
* @throws IOException if an error occurs during the remove operation.
*/
protected void removeInRange(Transaction tx, long start, long finish, Location location) throws IOException {
- List<Long> keys = new ArrayList<Long>();
+ List<Long> keys = new ArrayList<>();
for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx, start); i.hasNext();) {
Map.Entry<Long, List<JobLocation>> entry = i.next();
if (entry.getKey().longValue() <= finish) {
@@ -662,7 +662,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
* @throws IOException if an error occurs walking the scheduler tree.
*/
protected List<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException {
- List<JobLocation> references = new ArrayList<JobLocation>();
+ List<JobLocation> references = new ArrayList<>();
for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
Map.Entry<Long, List<JobLocation>> entry = i.next();
@@ -709,8 +709,8 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
// needed before firing the job event.
Map.Entry<Long, List<JobLocation>> first = getNextToSchedule();
if (first != null) {
- List<JobLocation> list = new ArrayList<JobLocation>(first.getValue());
- List<JobLocation> toRemove = new ArrayList<JobLocation>(list.size());
+ List<JobLocation> list = new ArrayList<>(first.getValue());
+ List<JobLocation> toRemove = new ArrayList<>(list.size());
final long executionTime = first.getKey();
long nextExecutionTime = 0;
if (executionTime <= currentTime) {
@@ -852,7 +852,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
}
void createIndexes(Transaction tx) throws IOException {
- this.index = new BTreeIndex<Long, List<JobLocation>>(this.store.getPageFile(), tx.allocate().getPageId());
+ this.index = new BTreeIndex<>(this.store.getPageFile(), tx.allocate().getPageId());
}
void load(Transaction tx) throws IOException {
@@ -863,7 +863,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
void read(DataInput in) throws IOException {
this.name = in.readUTF();
- this.index = new BTreeIndex<Long, List<JobLocation>>(this.store.getPageFile(), in.readLong());
+ this.index = new BTreeIndex<>(this.store.getPageFile(), in.readLong());
this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE);
}