You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/07 10:43:38 UTC
[1/2] git commit: CAMEL-6614: Fixed suspend/resume quartz2 routes.
Polished.
Updated Branches:
refs/heads/master ed7e7c9fe -> 0d616fbe8
CAMEL-6614: Fixed suspend/resume quartz2 routes. Polished.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0d616fbe
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0d616fbe
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0d616fbe
Branch: refs/heads/master
Commit: 0d616fbe8ec314c14d585a278bef9e7acde58359
Parents: bef0be5
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Aug 7 10:42:22 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Aug 7 10:43:31 2013 +0200
----------------------------------------------------------------------
.../component/quartz2/QuartzConstants.java | 4 +-
.../camel/component/quartz2/QuartzConsumer.java | 21 ++++--
.../camel/component/quartz2/QuartzEndpoint.java | 2 -
.../camel/component/quartz2/QuartzMessage.java | 1 -
.../component/quartz2/StatefulCamelJob.java | 2 -
.../component/quartz2/QuartzStopRouteTest.java | 69 ++++++++++++++++++++
.../quartz2/QuartzSuspendRouteTest.java | 69 ++++++++++++++++++++
.../src/test/resources/log4j.properties | 2 +-
8 files changed, 157 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0d616fbe/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzConstants.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzConstants.java
index feb6ce4..07284ce 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzConstants.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzConstants.java
@@ -18,11 +18,9 @@ package org.apache.camel.component.quartz2;
/**
* Provide some constants used in this component package.
- *
- * @author Zemian Deng saltnlight5@gmail.com
*/
public final class QuartzConstants {
- public static final String QUARTZ_CAMEL_JOBS_COUNT = "CamelJobsCount";
+ public static final String QUARTZ_CAMEL_JOBS_COUNT = "CamelQuartzJobsCount";
public static final String QUARTZ_ENDPOINT_URI = "CamelQuartzEndpoint";
http://git-wip-us.apache.org/repos/asf/camel/blob/0d616fbe/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzConsumer.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzConsumer.java
index a7b5f3f..af14186 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzConsumer.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzConsumer.java
@@ -19,30 +19,43 @@ package org.apache.camel.component.quartz2;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
-import org.quartz.Scheduler;
/**
* This consumer process QuartzMessage when scheduler job is executed per scheduled time. When the job runs, it will
* call this consumer's processor to process a new exchange with QuartzMessage.
- *
- * @author Zemian Deng saltnlight5@gmail.com
*/
public class QuartzConsumer extends DefaultConsumer {
+
public QuartzConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
}
+ @Override
public QuartzEndpoint getEndpoint() {
- return (QuartzEndpoint)super.getEndpoint();
+ return (QuartzEndpoint) super.getEndpoint();
}
@Override
protected void doStart() throws Exception {
+ super.doStart();
+ getEndpoint().onConsumerStart(this);
+ }
+
+ @Override
+ protected void doResume() throws Exception {
+ super.doResume();
getEndpoint().onConsumerStart(this);
}
@Override
protected void doStop() throws Exception {
getEndpoint().onConsumerStop(this);
+ super.doStop();
+ }
+
+ @Override
+ protected void doSuspend() throws Exception {
+ getEndpoint().onConsumerStop(this);
+ super.doSuspend();
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0d616fbe/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java
index 5c75191..df1e1b1 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzEndpoint.java
@@ -44,11 +44,9 @@ import org.slf4j.LoggerFactory;
import static org.quartz.CronScheduleBuilder.cronSchedule;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
-
/**
* This endpoint represent each job to be created in scheduler. When consumer is started or stopped, it will
* call back into doConsumerStart()/Stop() to pause/resume the scheduler trigger.
- *
*/
public class QuartzEndpoint extends DefaultEndpoint {
private static final transient Logger LOG = LoggerFactory.getLogger(QuartzEndpoint.class);
http://git-wip-us.apache.org/repos/asf/camel/blob/0d616fbe/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzMessage.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzMessage.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzMessage.java
index 7ea2eed..945a405 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzMessage.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/QuartzMessage.java
@@ -25,7 +25,6 @@ import org.quartz.Trigger;
/**
* A Camel message to be created upon each scheduled job execution.
- *
*/
public class QuartzMessage extends DefaultMessage {
private final JobExecutionContext jobExecutionContext;
http://git-wip-us.apache.org/repos/asf/camel/blob/0d616fbe/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/StatefulCamelJob.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/StatefulCamelJob.java b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/StatefulCamelJob.java
index 53b6785..505e4ca 100644
--- a/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/StatefulCamelJob.java
+++ b/components/camel-quartz2/src/main/java/org/apache/camel/component/quartz2/StatefulCamelJob.java
@@ -23,8 +23,6 @@ import org.quartz.PersistJobDataAfterExecution;
* A stateful job for CamelJob. For Quartz, this means it will re-save all job data map after each job execution,
* and it will not run concurrently within the Quartz thread pool even if you have multiple triggers or misfired
* instruct to do so.
- *
- * @author Zemian Deng saltnlight5@gmail.com
*/
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
http://git-wip-us.apache.org/repos/asf/camel/blob/0d616fbe/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/QuartzStopRouteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/QuartzStopRouteTest.java b/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/QuartzStopRouteTest.java
new file mode 100644
index 0000000..accb3d4
--- /dev/null
+++ b/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/QuartzStopRouteTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quartz2;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class QuartzStopRouteTest extends CamelTestSupport {
+
+ @Test
+ public void testQuartzSuspend() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMinimumMessageCount(1);
+
+ assertMockEndpointsSatisfied();
+
+ context.stopRoute("foo");
+
+ int size = mock.getReceivedCounter();
+
+ resetMocks();
+
+ mock.expectedMessageCount(0);
+ mock.assertIsSatisfied(3000);
+
+ assertEquals("Should not schedule when stopped", size, size);
+
+ resetMocks();
+ mock.expectedMinimumMessageCount(1);
+
+ context.startRoute("foo");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ // START SNIPPET: e1
+ // triggers every second at precise 00,01,02,03..59
+ // notice we must use + as space when configured using URI parameter
+ from("quartz2://myGroup/myTimerName?cron=0/1+*+*+*+*+?")
+ .routeId("foo")
+ .to("log:result", "mock:result");
+ // END SNIPPET: e1
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/0d616fbe/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/QuartzSuspendRouteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/QuartzSuspendRouteTest.java b/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/QuartzSuspendRouteTest.java
new file mode 100644
index 0000000..8ce58bd
--- /dev/null
+++ b/components/camel-quartz2/src/test/java/org/apache/camel/component/quartz2/QuartzSuspendRouteTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quartz2;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class QuartzSuspendRouteTest extends CamelTestSupport {
+
+ @Test
+ public void testQuartzSuspend() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMinimumMessageCount(1);
+
+ assertMockEndpointsSatisfied();
+
+ context.suspendRoute("foo");
+
+ int size = mock.getReceivedCounter();
+
+ resetMocks();
+
+ mock.expectedMessageCount(0);
+ mock.assertIsSatisfied(3000);
+
+ assertEquals("Should not schedule when suspended", size, size);
+
+ resetMocks();
+ mock.expectedMinimumMessageCount(1);
+
+ context.resumeRoute("foo");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ // START SNIPPET: e1
+ // triggers every second at precise 00,01,02,03..59
+ // notice we must use + as space when configured using URI parameter
+ from("quartz2://myGroup/myTimerName?cron=0/1+*+*+*+*+?")
+ .routeId("foo")
+ .to("log:result", "mock:result");
+ // END SNIPPET: e1
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/0d616fbe/components/camel-quartz2/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-quartz2/src/test/resources/log4j.properties b/components/camel-quartz2/src/test/resources/log4j.properties
index 86e4b3e..cf13b39 100644
--- a/components/camel-quartz2/src/test/resources/log4j.properties
+++ b/components/camel-quartz2/src/test/resources/log4j.properties
@@ -16,7 +16,7 @@
## ------------------------------------------------------------------------
#
-# The logging properties used for eclipse testing, We want to see debug output on the console.
+# The logging properties used for testing.
#
log4j.rootLogger=INFO, file
[2/2] git commit: CAMEL-6614: Fixed suspend/resume quartz routes
Posted by da...@apache.org.
CAMEL-6614: Fixed suspend/resume quartz routes
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bef0be5b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bef0be5b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bef0be5b
Branch: refs/heads/master
Commit: bef0be5b7a935ef3d445ed49ec47f9b55662b6f7
Parents: ed7e7c9
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Aug 7 10:40:32 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Aug 7 10:43:31 2013 +0200
----------------------------------------------------------------------
.../camel/component/quartz/QuartzConsumer.java | 12 ++++
.../component/quartz/QuartzStopRouteTest.java | 69 ++++++++++++++++++++
.../quartz/QuartzSuspendRouteTest.java | 69 ++++++++++++++++++++
3 files changed, 150 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/bef0be5b/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzConsumer.java b/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzConsumer.java
index 436d626..fe88eac 100644
--- a/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzConsumer.java
+++ b/components/camel-quartz/src/main/java/org/apache/camel/component/quartz/QuartzConsumer.java
@@ -40,8 +40,20 @@ public class QuartzConsumer extends DefaultConsumer {
}
@Override
+ protected void doResume() throws Exception {
+ super.doResume();
+ getEndpoint().consumerStarted(this);
+ }
+
+ @Override
protected void doStop() throws Exception {
getEndpoint().consumerStopped(this);
super.doStop();
}
+
+ @Override
+ protected void doSuspend() throws Exception {
+ getEndpoint().consumerStopped(this);
+ super.doSuspend();
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bef0be5b/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzStopRouteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzStopRouteTest.java b/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzStopRouteTest.java
new file mode 100644
index 0000000..6fd3da3
--- /dev/null
+++ b/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzStopRouteTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quartz;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class QuartzStopRouteTest extends CamelTestSupport {
+
+ @Test
+ public void testQuartzStop() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMinimumMessageCount(1);
+
+ assertMockEndpointsSatisfied();
+
+ context.stopRoute("foo");
+
+ int size = mock.getReceivedCounter();
+
+ resetMocks();
+
+ mock.expectedMessageCount(0);
+ mock.assertIsSatisfied(3000);
+
+ assertEquals("Should not schedule when stopped", size, size);
+
+ resetMocks();
+ mock.expectedMinimumMessageCount(1);
+
+ context.startRoute("foo");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ // START SNIPPET: e1
+ // triggers every second at precise 00,01,02,03..59
+ // notice we must use + as space when configured using URI parameter
+ from("quartz://myGroup/myTimerName?cron=0/1+*+*+*+*+?")
+ .routeId("foo")
+ .to("log:result", "mock:result");
+ // END SNIPPET: e1
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/bef0be5b/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzSuspendRouteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzSuspendRouteTest.java b/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzSuspendRouteTest.java
new file mode 100644
index 0000000..3632cbc
--- /dev/null
+++ b/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzSuspendRouteTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quartz;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class QuartzSuspendRouteTest extends CamelTestSupport {
+
+ @Test
+ public void testQuartzSuspend() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMinimumMessageCount(1);
+
+ assertMockEndpointsSatisfied();
+
+ context.suspendRoute("foo");
+
+ int size = mock.getReceivedCounter();
+
+ resetMocks();
+
+ mock.expectedMessageCount(0);
+ mock.assertIsSatisfied(3000);
+
+ assertEquals("Should not schedule when suspended", size, size);
+
+ resetMocks();
+ mock.expectedMinimumMessageCount(1);
+
+ context.resumeRoute("foo");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ // START SNIPPET: e1
+ // triggers every second at precise 00,01,02,03..59
+ // notice we must use + as space when configured using URI parameter
+ from("quartz://myGroup/myTimerName?cron=0/1+*+*+*+*+?")
+ .routeId("foo")
+ .to("log:result", "mock:result");
+ // END SNIPPET: e1
+ }
+ };
+ }
+}
\ No newline at end of file