You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/09/21 09:56:21 UTC
[1/8] Donating camel-beanstalk component to Apache Camel
Repository: camel
Updated Branches:
refs/heads/master a49627216 -> 4233318d9
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConnectionSettingsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConnectionSettingsTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConnectionSettingsTest.java
new file mode 100644
index 0000000..6b1d81d
--- /dev/null
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConnectionSettingsTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import com.surftools.BeanstalkClient.Client;
+import org.junit.Before;
+
+public class ConnectionSettingsTest {
+ @Test
+ public void parseUriTest() {
+ final ConnectionSettingsFactory factory = BeanstalkComponent.connFactory;
+ assertEquals("Full URI", new ConnectionSettings("host.domain.tld", 11300, "someTube"), factory.parseUri("host.domain.tld:11300/someTube"));
+ assertEquals("No port", new ConnectionSettings("host.domain.tld", Client.DEFAULT_PORT, "someTube"), factory.parseUri("host.domain.tld/someTube"));
+ assertEquals("Only tube", new ConnectionSettings(Client.DEFAULT_HOST, Client.DEFAULT_PORT, "someTube"), factory.parseUri("someTube"));
+ }
+
+ @Test
+ public void parseTubesTest() {
+ final ConnectionSettingsFactory factory = BeanstalkComponent.connFactory;
+ assertArrayEquals("Full URI", new String[] {"tube1", "tube2"}, factory.parseUri("host:90/tube1+tube2").tubes);
+ assertArrayEquals("No port", new String[] {"tube1", "tube2"}, factory.parseUri("host/tube1+tube2").tubes);
+ assertArrayEquals("Only tubes", new String[] {"tube1", "tube2"}, factory.parseUri("tube1+tube2").tubes);
+ assertArrayEquals("Empty URI", new String[0], factory.parseUri("").tubes);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void notValidHost() {
+ final ConnectionSettingsFactory factory = BeanstalkComponent.connFactory;
+ fail(String.format("Calling on not valid URI must raise exception, but got result %s", factory.parseUri("not_valid?host/tube?")));
+ }
+
+ @Before
+ public void setUp() {
+ BeanstalkComponent.connFactory = new ConnectionSettingsFactory();
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConsumerCompletionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConsumerCompletionTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConsumerCompletionTest.java
new file mode 100644
index 0000000..14a0955
--- /dev/null
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConsumerCompletionTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import com.surftools.BeanstalkClient.BeanstalkException;
+import com.surftools.BeanstalkClient.Job;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+
+public class ConsumerCompletionTest extends BeanstalkMockTestSupport {
+ final String testMessage = "hello, world";
+
+ boolean shouldIdie = false;
+ final Processor processor = new Processor() {
+ @Override
+ public void process(Exchange exchange) throws InterruptedException {
+ if (shouldIdie) throw new InterruptedException("die");
+ }
+ };
+
+ @Test
+ public void testDeleteOnComplete() throws Exception {
+ final long jobId = 111;
+ final byte[] payload = Helper.stringToBytes(testMessage);
+ final Job jobMock = mock(Job.class);
+
+ when(jobMock.getJobId()).thenReturn(jobId);
+ when(jobMock.getData()).thenReturn(payload);
+ when(client.reserve(anyInt()))
+ .thenReturn(jobMock)
+ .thenReturn(null);
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMinimumMessageCount(1);
+ result.expectedBodiesReceived(testMessage);
+ result.expectedPropertyReceived(Headers.JOB_ID, jobId);
+ result.message(0).header(Headers.JOB_ID).isEqualTo(jobId);
+ result.assertIsSatisfied(2000);
+
+ verify(client, atLeastOnce()).reserve(anyInt());
+ verify(client).delete(jobId);
+ }
+
+ @Test
+ public void testReleaseOnFailure() throws Exception {
+ shouldIdie = true;
+ final long jobId = 111;
+ final long priority = BeanstalkComponent.DEFAULT_PRIORITY;
+ final int delay = BeanstalkComponent.DEFAULT_DELAY;
+ final byte[] payload = Helper.stringToBytes(testMessage);
+ final Job jobMock = mock(Job.class);
+
+ when(jobMock.getJobId()).thenReturn(jobId);
+ when(jobMock.getData()).thenReturn(payload);
+ when(client.reserve(anyInt()))
+ .thenReturn(jobMock)
+ .thenReturn(null);
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMinimumMessageCount(1);
+ result.assertIsNotSatisfied(1000);
+
+ verify(client, atLeastOnce()).reserve(anyInt());
+ verify(client).release(jobId, priority, delay);
+ }
+
+ @Test
+ public void testBeanstalkException() throws Exception {
+ shouldIdie = false;
+ final Job jobMock = mock(Job.class);
+ final long jobId = 111;
+ final byte[] payload = Helper.stringToBytes(testMessage);
+
+ when(jobMock.getJobId()).thenReturn(jobId);
+ when(jobMock.getData()).thenReturn(payload);
+ when(client.reserve(anyInt()))
+ .thenThrow(new BeanstalkException("test"))
+ .thenReturn(jobMock);
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+ result.expectedBodiesReceived(testMessage);
+ result.expectedPropertyReceived(Headers.JOB_ID, jobId);
+ result.message(0).header(Headers.JOB_ID).isEqualTo(jobId);
+ result.assertIsSatisfied(100);
+
+ verify(client, atLeast(1)).reserve(anyInt());
+ verify(client, times(1)).close();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("beanstalk:tube?consumer.onFailure=release").process(processor).to("mock:result");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/EndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/EndpointTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/EndpointTest.java
new file mode 100644
index 0000000..903f272
--- /dev/null
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/EndpointTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.After;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.Ignore;
+
+public class EndpointTest {
+ CamelContext context = null;
+
+ @Before
+ public void setUp() throws Exception {
+ context = new DefaultCamelContext();
+ context.disableJMX();
+ context.start();
+ }
+
+ @Test
+ public void testPriority() {
+ BeanstalkEndpoint endpoint = context.getEndpoint("beanstalk:default?jobPriority=1000", BeanstalkEndpoint.class);
+ assertNotNull("Beanstalk endpoint", endpoint);
+ assertEquals("Priority", 1000, endpoint.getJobPriority());
+ }
+
+ @Test
+ public void testTimeToRun() {
+ BeanstalkEndpoint endpoint = context.getEndpoint("beanstalk:default?jobTimeToRun=10", BeanstalkEndpoint.class);
+ assertNotNull("Beanstalk endpoint", endpoint);
+ assertEquals("Time to run", 10, endpoint.getJobTimeToRun());
+ }
+
+ @Test
+ public void testDelay() {
+ BeanstalkEndpoint endpoint = context.getEndpoint("beanstalk:default?jobDelay=10", BeanstalkEndpoint.class);
+ assertNotNull("Beanstalk endpoint", endpoint);
+ assertEquals("Delay", 10, endpoint.getJobDelay());
+ }
+
+ @Test
+ public void testCommand() {
+ BeanstalkEndpoint endpoint = context.getEndpoint("beanstalk:default?command=release", BeanstalkEndpoint.class);
+ assertNotNull("Beanstalk endpoint", endpoint);
+ assertEquals("Command", BeanstalkComponent.COMMAND_RELEASE, endpoint.command);
+ }
+
+ @Test
+ public void testTubes() {
+ BeanstalkEndpoint endpoint = context.getEndpoint("beanstalk:host:11303/tube1+tube%2B+tube%3F?command=kick", BeanstalkEndpoint.class);
+ assertNotNull("Beanstalk endpoint", endpoint);
+ assertEquals("Command", BeanstalkComponent.COMMAND_KICK, endpoint.command);
+ assertEquals("Host", "host", endpoint.conn.host);
+ assertArrayEquals("Tubes", new String[] {"tube1", "tube+", "tube?"}, endpoint.conn.tubes);
+ }
+
+ @Test(expected=FailedToCreateProducerException.class)
+ public void testWrongCommand() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start").to("beanstalk:default?command=noCommand");
+ }
+ });
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ context.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/Helper.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/Helper.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/Helper.java
new file mode 100644
index 0000000..3ef5cb9
--- /dev/null
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/Helper.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import com.surftools.BeanstalkClient.Client;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import org.apache.camel.CamelContext;
+
+public final class Helper {
+ public static ConnectionSettings mockConn(final Client client) {
+ return new MockConnectionSettings(client);
+ }
+
+ public static void mockComponent(final Client client) {
+ BeanstalkComponent.setConnectionSettingsFactory(new ConnectionSettingsFactory() {
+ @Override
+ public ConnectionSettings parseUri(String uri) {
+ return new MockConnectionSettings(client);
+ }
+ });
+ }
+
+ public static void revertComponent() {
+ BeanstalkComponent.setConnectionSettingsFactory(ConnectionSettingsFactory.DEFAULT);
+ }
+
+ public static BeanstalkEndpoint getEndpoint(String uri, CamelContext context, Client client) throws Exception {
+ BeanstalkEndpoint endpoint = new BeanstalkEndpoint(uri, context.getComponent("beanstalk"), mockConn(client));
+ context.addEndpoint(uri, endpoint);
+ return endpoint;
+ }
+
+ public static byte[] stringToBytes(final String s) throws IOException {
+ final ByteArrayOutputStream byteOS = new ByteArrayOutputStream();
+ final DataOutputStream dataStream = new DataOutputStream(byteOS);
+
+ try {
+ dataStream.writeBytes(s);
+ dataStream.flush();
+ return byteOS.toByteArray();
+ } finally {
+ dataStream.close();
+ byteOS.close();
+ }
+ }
+}
+
+class MockConnectionSettings extends ConnectionSettings {
+ final Client client;
+
+ public MockConnectionSettings(Client client) {
+ super("tube");
+ this.client = client;
+ }
+
+ @Override
+ public Client newReadingClient(boolean useBlockIO) {
+ return client;
+ }
+
+ @Override
+ public Client newWritingClient() {
+ return client;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ImmediateConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ImmediateConsumerTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ImmediateConsumerTest.java
new file mode 100644
index 0000000..7a5a296
--- /dev/null
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ImmediateConsumerTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import com.surftools.BeanstalkClient.Job;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class ImmediateConsumerTest extends BeanstalkMockTestSupport {
+ final String testMessage = "hello, world";
+
+ boolean shouldIdie = false;
+ final Processor processor = new Processor() {
+ @Override
+ public void process(Exchange exchange) throws InterruptedException {
+ if (shouldIdie) throw new InterruptedException("die");
+ }
+ };
+
+ @Test
+ public void testDeleteOnSuccess() throws Exception {
+ final Job jobMock = mock(Job.class);
+ final long jobId = 111;
+ final byte[] payload = Helper.stringToBytes(testMessage);
+
+ when(jobMock.getJobId()).thenReturn(jobId);
+ when(jobMock.getData()).thenReturn(payload);
+ when(client.reserve(anyInt()))
+ .thenReturn(jobMock)
+ .thenReturn(null);
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+ result.expectedBodiesReceived(testMessage);
+ result.expectedPropertyReceived(Headers.JOB_ID, jobId);
+ result.message(0).header(Headers.JOB_ID).isEqualTo(jobId);
+ result.assertIsSatisfied(100);
+
+ verify(client, atLeast(1)).reserve(0);
+ verify(client, atLeast(1)).delete(jobId);
+ }
+
+ @Test
+ public void testDeleteOnFailure() throws Exception {
+ shouldIdie = true;
+ final long jobId = 111;
+ final byte[] payload = Helper.stringToBytes(testMessage);
+ final Job jobMock = mock(Job.class);
+
+ when(jobMock.getJobId()).thenReturn(jobId);
+ when(jobMock.getData()).thenReturn(payload);
+ when(client.reserve(anyInt()))
+ .thenReturn(jobMock)
+ .thenReturn(null);
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMinimumMessageCount(1);
+ result.assertIsNotSatisfied(1000);
+
+ verify(client, atLeastOnce()).reserve(anyInt());
+ verify(client, atLeast(1)).delete(jobId);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("beanstalk:tube?consumer.awaitJob=false").process(processor).to("mock:result");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ProducerTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ProducerTest.java
new file mode 100644
index 0000000..e3949a2
--- /dev/null
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ProducerTest.java
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import com.surftools.BeanstalkClient.BeanstalkException;
+import org.apache.camel.component.beanstalk.processors.*;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.Produce;
+import org.apache.camel.Producer;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.hamcrest.CoreMatchers.*;
+import static org.mockito.Mockito.*;
+
+public class ProducerTest extends BeanstalkMockTestSupport {
+ final String testMessage = "hello, world";
+
+ @EndpointInject(uri = "beanstalk:tube")
+ protected BeanstalkEndpoint endpoint;
+
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint resultEndpoint;
+
+ @Produce(uri = "direct:start")
+ protected ProducerTemplate direct;
+
+ @Test
+ public void testPut() throws Exception {
+ final long priority = BeanstalkComponent.DEFAULT_PRIORITY;
+ final int delay = BeanstalkComponent.DEFAULT_DELAY;
+ final int timeToRun = BeanstalkComponent.DEFAULT_TIME_TO_RUN;
+ final byte[] payload = Helper.stringToBytes(testMessage);
+ final long jobId = 111;
+
+ when(client.put(priority, delay, timeToRun, payload)).thenReturn(jobId);
+
+ final Producer producer = endpoint.createProducer();
+ assertNotNull("Producer", producer);
+ assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
+ assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(PutCommand.class));
+
+ final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() { // TODO: SetBodyProcessor(?)
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody(testMessage);
+ }
+ });
+
+ assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getIn().getHeader(Headers.JOB_ID, Long.class));
+ verify(client).put(priority, delay, timeToRun, payload);
+ }
+
+ @Test
+ public void testPutOut() throws Exception {
+ final long priority = BeanstalkComponent.DEFAULT_PRIORITY;
+ final int delay = BeanstalkComponent.DEFAULT_DELAY;
+ final int timeToRun = BeanstalkComponent.DEFAULT_TIME_TO_RUN;
+ final byte[] payload = Helper.stringToBytes(testMessage);
+ final long jobId = 111;
+
+ when(client.put(priority, delay, timeToRun, payload)).thenReturn(jobId);
+
+ Producer producer = endpoint.createProducer();
+ assertNotNull("Producer", producer);
+ assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
+ assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(PutCommand.class));
+
+ final Exchange exchange = template.send(endpoint, ExchangePattern.InOut, new Processor() { // TODO: SetBodyProcessor(?)
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody(testMessage);
+ }
+ });
+
+ assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getOut().getHeader(Headers.JOB_ID, Long.class));
+ verify(client).put(priority, delay, timeToRun, payload);
+ }
+
+ @Test
+ public void testPutWithHeaders() throws Exception {
+ final long priority = 111;
+ final int delay = 5;
+ final int timeToRun = 65;
+ final byte[] payload = Helper.stringToBytes(testMessage);
+ final long jobId = 111;
+
+ when(client.put(priority, delay, timeToRun, payload)).thenReturn(jobId);
+
+ Producer producer = endpoint.createProducer();
+ assertNotNull("Producer", producer);
+ assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
+ assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(PutCommand.class));
+
+ final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() { // TODO: SetBodyProcessor(?)
+ public void process(Exchange exchange) {
+ exchange.getIn().setHeader(Headers.PRIORITY, priority);
+ exchange.getIn().setHeader(Headers.DELAY, delay);
+ exchange.getIn().setHeader(Headers.TIME_TO_RUN, timeToRun);
+ exchange.getIn().setBody(testMessage);
+ }
+ });
+
+ assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getIn().getHeader(Headers.JOB_ID, Long.class));
+ verify(client).put(priority, delay, timeToRun, payload);
+ }
+
+ @Test
+ public void testBury() throws Exception {
+ final long priority = BeanstalkComponent.DEFAULT_PRIORITY;
+ final long jobId = 111;
+
+ endpoint.setCommand(BeanstalkComponent.COMMAND_BURY);
+ Producer producer = endpoint.createProducer();
+ assertNotNull("Producer", producer);
+ assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
+ assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(BuryCommand.class));
+
+ when(client.bury(jobId, priority)).thenReturn(true);
+
+ final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) {
+ exchange.getIn().setHeader(Headers.JOB_ID, jobId);
+ }
+ });
+
+ assertEquals("Op result", Boolean.TRUE, exchange.getIn().getHeader(Headers.RESULT, Boolean.class));
+ assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getIn().getHeader(Headers.JOB_ID, Long.class));
+ verify(client).bury(jobId, priority);
+ }
+
+ @Test
+ public void testBuryNoJobId() throws Exception {
+ endpoint.setCommand(BeanstalkComponent.COMMAND_BURY);
+ Producer producer = endpoint.createProducer();
+ assertNotNull("Producer", producer);
+ assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
+ assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(BuryCommand.class));
+
+ final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) {}
+ });
+
+ assertTrue("Exchange failed", exchange.isFailed());
+
+ verify(client, never()).bury(anyLong(), anyLong());
+ }
+
+ @Test
+ public void testBuryWithHeaders() throws Exception {
+ final long priority = 1000;
+ final long jobId = 111;
+
+ endpoint.setCommand(BeanstalkComponent.COMMAND_BURY);
+ Producer producer = endpoint.createProducer();
+ assertNotNull("Producer", producer);
+ assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
+ assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(BuryCommand.class));
+
+ when(client.bury(jobId, priority)).thenReturn(true);
+
+ final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) {
+ exchange.getIn().setHeader(Headers.PRIORITY, priority);
+ exchange.getIn().setHeader(Headers.JOB_ID, jobId);
+ }
+ });
+
+ assertEquals("Op result", Boolean.TRUE, exchange.getIn().getHeader(Headers.RESULT, Boolean.class));
+ assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getIn().getHeader(Headers.JOB_ID, Long.class));
+ verify(client).bury(jobId, priority);
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ final long jobId = 111;
+
+ endpoint.setCommand(BeanstalkComponent.COMMAND_DELETE);
+ Producer producer = endpoint.createProducer();
+ assertNotNull("Producer", producer);
+ assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
+ assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(DeleteCommand.class));
+
+ when(client.delete(jobId)).thenReturn(true);
+
+ final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) {
+ exchange.getIn().setHeader(Headers.JOB_ID, jobId);
+ }
+ });
+
+ assertEquals("Op result", Boolean.TRUE, exchange.getIn().getHeader(Headers.RESULT, Boolean.class));
+ assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getIn().getHeader(Headers.JOB_ID, Long.class));
+ verify(client).delete(jobId);
+ }
+
+ @Test
+ public void testDeleteNoJobId() throws Exception {
+ endpoint.setCommand(BeanstalkComponent.COMMAND_DELETE);
+ Producer producer = endpoint.createProducer();
+ assertNotNull("Producer", producer);
+ assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
+ assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(DeleteCommand.class));
+
+ final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) {}
+ });
+
+ assertTrue("Exchange failed", exchange.isFailed());
+
+ verify(client, never()).delete(anyLong());
+ }
+
+ @Test
+ public void testRelease() throws Exception {
+ final long priority = BeanstalkComponent.DEFAULT_PRIORITY;
+ final int delay = BeanstalkComponent.DEFAULT_DELAY;
+ final long jobId = 111;
+
+ endpoint.setCommand(BeanstalkComponent.COMMAND_RELEASE);
+ Producer producer = endpoint.createProducer();
+ assertNotNull("Producer", producer);
+ assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
+ assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(ReleaseCommand.class));
+
+ when(client.release(jobId, priority, delay)).thenReturn(true);
+
+ final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) {
+ exchange.getIn().setHeader(Headers.JOB_ID, jobId);
+ }
+ });
+
+ assertEquals("Op result", Boolean.TRUE, exchange.getIn().getHeader(Headers.RESULT, Boolean.class));
+ assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getIn().getHeader(Headers.JOB_ID, Long.class));
+ verify(client).release(jobId, priority, delay);
+ }
+
+ @Test
+ public void testReleaseNoJobId() throws Exception {
+ endpoint.setCommand(BeanstalkComponent.COMMAND_RELEASE);
+ Producer producer = endpoint.createProducer();
+ assertNotNull("Producer", producer);
+ assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
+ assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(ReleaseCommand.class));
+
+ final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) {}
+ });
+
+ assertTrue("Exchange failed", exchange.isFailed());
+
+ verify(client, never()).release(anyLong(), anyLong(), anyInt());
+ }
+
+ @Test
+ public void testReleaseWithHeaders() throws Exception {
+ final long priority = 1001;
+ final int delay = 124;
+ final long jobId = 111;
+
+ endpoint.setCommand(BeanstalkComponent.COMMAND_RELEASE);
+ Producer producer = endpoint.createProducer();
+ assertNotNull("Producer", producer);
+ assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
+ assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(ReleaseCommand.class));
+
+ when(client.release(jobId, priority, delay)).thenReturn(true);
+
+ final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) {
+ exchange.getIn().setHeader(Headers.JOB_ID, jobId);
+ exchange.getIn().setHeader(Headers.PRIORITY, priority);
+ exchange.getIn().setHeader(Headers.DELAY, delay);
+ }
+ });
+
+ assertEquals("Op result", Boolean.TRUE, exchange.getIn().getHeader(Headers.RESULT, Boolean.class));
+ assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getIn().getHeader(Headers.JOB_ID, Long.class));
+ verify(client).release(jobId, priority, delay);
+ }
+
+ @Test
+ public void testTouch() throws Exception {
+ final long jobId = 111;
+
+ endpoint.setCommand(BeanstalkComponent.COMMAND_TOUCH);
+ Producer producer = endpoint.createProducer();
+ assertNotNull("Producer", producer);
+ assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
+ assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(TouchCommand.class));
+
+ when(client.touch(jobId)).thenReturn(true);
+
+ final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) {
+ exchange.getIn().setHeader(Headers.JOB_ID, jobId);
+ }
+ });
+
+ assertEquals("Op result", Boolean.TRUE, exchange.getIn().getHeader(Headers.RESULT, Boolean.class));
+ assertEquals("Job ID in exchange", Long.valueOf(jobId), exchange.getIn().getHeader(Headers.JOB_ID, Long.class));
+ verify(client).touch(jobId);
+ }
+
+ @Test
+ public void testTouchNoJobId() throws Exception {
+ endpoint.setCommand(BeanstalkComponent.COMMAND_TOUCH);
+ Producer producer = endpoint.createProducer();
+ assertNotNull("Producer", producer);
+ assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
+ assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(TouchCommand.class));
+
+ final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) {}
+ });
+
+ assertTrue("Exchange failed", exchange.isFailed());
+
+ verify(client, never()).touch(anyLong());
+ }
+
+ @Test
+ public void testHeaderOverride() throws Exception {
+ final long priority = 1020;
+ final int delay = 50;
+ final int timeToRun = 75;
+ final byte[] payload = Helper.stringToBytes(testMessage);
+ final long jobId = 113;
+
+ when(client.put(priority, delay, timeToRun, payload)).thenReturn(jobId);
+
+ resultEndpoint.expectedMessageCount(1);
+ resultEndpoint.allMessages().body().isEqualTo(testMessage);
+ resultEndpoint.allMessages().header(Headers.JOB_ID).isEqualTo(Long.valueOf(jobId));
+
+ direct.sendBodyAndHeader(testMessage, Headers.TIME_TO_RUN, timeToRun);
+ resultEndpoint.assertIsSatisfied();
+
+ final Long jobIdIn = resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, Long.class);
+ assertNotNull("Job ID in 'In' message", jobIdIn);
+
+ verify(client).put(priority, delay, timeToRun, payload);
+ }
+
+ @Test
+ public void test1BeanstalkException() throws Exception {
+ final long priority = 1020;
+ final int delay = 50;
+ final int timeToRun = 75;
+ final byte[] payload = Helper.stringToBytes(testMessage);
+ final long jobId = 113;
+
+ when(client.put(priority, delay, timeToRun, payload))
+ .thenThrow(new BeanstalkException("test"))
+ .thenReturn(jobId);
+
+ resultEndpoint.expectedMessageCount(1);
+ resultEndpoint.allMessages().body().isEqualTo(testMessage);
+ resultEndpoint.allMessages().header(Headers.JOB_ID).isEqualTo(Long.valueOf(jobId));
+
+ direct.sendBodyAndHeader(testMessage, Headers.TIME_TO_RUN, timeToRun);
+ resultEndpoint.assertIsSatisfied();
+
+ final Long jobIdIn = resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, Long.class);
+ assertNotNull("Job ID in 'In' message", jobIdIn);
+
+ verify(client, times(1)).close();
+ verify(client, times(2)).put(priority, delay, timeToRun, payload);
+ }
+
+ @Test
+ public void test2BeanstalkException() throws Exception {
+ final long jobId = 111;
+
+ when(client.touch(jobId))
+ .thenThrow(new BeanstalkException("test"));
+
+ endpoint.setCommand(BeanstalkComponent.COMMAND_TOUCH);
+ final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) {
+ exchange.getIn().setHeader(Headers.JOB_ID, jobId);
+ }
+ });
+
+ assertTrue("Exchange failed", exchange.isFailed());
+
+ verify(client, times(2)).touch(jobId);
+ verify(client, times(1)).close();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start").to("beanstalk:tube?jobPriority=1020&jobDelay=50&jobTimeToRun=65").to("mock:result");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BeanstalkCamelTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BeanstalkCamelTestSupport.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BeanstalkCamelTestSupport.java
new file mode 100644
index 0000000..fb87c9a
--- /dev/null
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BeanstalkCamelTestSupport.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.integration;
+
+import org.apache.camel.component.beanstalk.ConnectionSettings;
+import org.apache.camel.component.beanstalk.ConnectionSettingsFactory;
+import com.surftools.BeanstalkClient.Client;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+
+public abstract class BeanstalkCamelTestSupport extends CamelTestSupport {
+ final ConnectionSettingsFactory connFactory = ConnectionSettingsFactory.DEFAULT;
+ final String tubeName = String.format("test%d", System.currentTimeMillis());
+
+ protected Client reader = null;
+ protected Client writer = null;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ ConnectionSettings conn = connFactory.parseUri(tubeName);
+ writer = conn.newWritingClient();
+ reader = conn.newReadingClient(false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BuryProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BuryProducerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BuryProducerIntegrationTest.java
new file mode 100644
index 0000000..033955e
--- /dev/null
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BuryProducerIntegrationTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.integration;
+
+import org.apache.camel.component.beanstalk.Headers;
+import com.surftools.BeanstalkClient.Job;
+import java.io.IOException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Ignore;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class BuryProducerIntegrationTest extends BeanstalkCamelTestSupport {
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint resultEndpoint;
+
+ @Produce(uri = "direct:start")
+ protected ProducerTemplate direct;
+
+ @Ignore("requires reserve - bury sequence")
+ @Test
+ public void testBury() throws InterruptedException, IOException {
+ long jobId = writer.put(0, 0, 5, new byte[0]);
+ assertTrue("Valid Job Id", jobId > 0);
+
+ resultEndpoint.expectedMessageCount(1);
+ resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull();
+ resultEndpoint.allMessages().header(Headers.RESULT).isEqualTo(true);
+ direct.sendBodyAndHeader(null, Headers.JOB_ID, jobId);
+
+ assertMockEndpointsSatisfied();
+
+ final Long messageJobId = resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, Long.class);
+ assertNotNull("Job ID in message", messageJobId);
+ assertEquals("Message Job ID equals", jobId, messageJobId.longValue());
+
+ final Job job = reader.reserve(0);
+ assertNull("Beanstalk client has no message", job);
+
+ final Job buried = reader.peekBuried();
+ assertNotNull("Job in buried", buried);
+ assertEquals("Buried job id", jobId, buried.getJobId());
+ }
+
+ @Test(expected=CamelExecutionException.class)
+ public void testNoJobId() throws InterruptedException, IOException {
+ resultEndpoint.expectedMessageCount(0);
+ direct.sendBody(new byte[0]);
+
+ resultEndpoint.assertIsSatisfied();
+ assertListSize("Number of exceptions", resultEndpoint.getFailures(), 1);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start").to("beanstalk:"+tubeName+"?command=bury").to("mock:result");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ConsumerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ConsumerIntegrationTest.java
new file mode 100644
index 0000000..6e2b4de
--- /dev/null
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ConsumerIntegrationTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.integration;
+
+import org.apache.camel.component.beanstalk.Headers;
+import org.apache.camel.component.beanstalk.Helper;
+import java.io.IOException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class ConsumerIntegrationTest extends BeanstalkCamelTestSupport {
+ final String testMessage = "Hello, world!";
+
+ @EndpointInject(uri = "mock:result")
+ MockEndpoint result;
+
+ @Test
+ public void testReceive() throws IOException, InterruptedException {
+ long PRIO = 0;
+ int TTR = 10;
+ final long jobId = writer.put(PRIO, 0, TTR, Helper.stringToBytes(testMessage));
+
+ result.expectedMessageCount(1);
+ result.expectedPropertyReceived(Headers.JOB_ID, jobId);
+ result.message(0).header(Exchange.CREATED_TIMESTAMP).isNotNull();
+ result.message(0).header(Headers.JOB_ID).isEqualTo(Long.valueOf(jobId));
+ result.message(0).header(Headers.PRIORITY).isEqualTo(Long.valueOf(PRIO));
+ result.message(0).header(Headers.TUBE).isEqualTo(tubeName);
+ result.message(0).header(Headers.STATE).isEqualTo("reserved");
+ result.message(0).header(Headers.AGE).isGreaterThan(0);
+ result.message(0).header(Headers.TIME_LEFT).isGreaterThan(0);
+ result.message(0).header(Headers.TIMEOUTS).isNotNull();
+ result.message(0).header(Headers.RELEASES).isNotNull();
+ result.message(0).header(Headers.BURIES).isNotNull();
+ result.message(0).header(Headers.KICKS).isNotNull();
+ result.message(0).body().isEqualTo(testMessage);
+ result.assertIsSatisfied(500);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("beanstalk:"+tubeName).to("mock:result");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/DeleteProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/DeleteProducerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/DeleteProducerIntegrationTest.java
new file mode 100644
index 0000000..9d9dc36
--- /dev/null
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/DeleteProducerIntegrationTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.integration;
+
+import org.apache.camel.component.beanstalk.Headers;
+import com.surftools.BeanstalkClient.Job;
+import java.io.IOException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class DeleteProducerIntegrationTest extends BeanstalkCamelTestSupport {
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint resultEndpoint;
+
+ @Produce(uri = "direct:start")
+ protected ProducerTemplate direct;
+
+ @Test
+ public void testDelete() throws InterruptedException, IOException {
+ long jobId = writer.put(0, 0, 5, new byte[0]);
+ assertTrue("Valid Job Id", jobId > 0);
+
+ resultEndpoint.expectedMessageCount(1);
+ resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull();
+ resultEndpoint.allMessages().header(Headers.RESULT).isEqualTo(true);
+ direct.sendBodyAndHeader(null, Headers.JOB_ID, jobId);
+
+ assertMockEndpointsSatisfied();
+
+ final Long messageJobId = resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, Long.class);
+ assertNotNull("Job ID in message", messageJobId);
+ assertEquals("Message Job ID equals", jobId, messageJobId.longValue());
+
+ final Job job = reader.peek(jobId);
+ assertNull("Job has been deleted", job);
+ }
+
+ @Test(expected=CamelExecutionException.class)
+ public void testNoJobId() throws InterruptedException, IOException {
+ resultEndpoint.expectedMessageCount(0);
+ direct.sendBody(new byte[0]);
+
+ resultEndpoint.assertIsSatisfied();
+ assertListSize("Number of exceptions", resultEndpoint.getFailures(), 1);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start").to("beanstalk:"+tubeName+"?command=delete").to("mock:result");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/PutProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/PutProducerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/PutProducerIntegrationTest.java
new file mode 100644
index 0000000..ff96d3e
--- /dev/null
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/PutProducerIntegrationTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.integration;
+
+import org.apache.camel.component.beanstalk.Headers;
+import com.surftools.BeanstalkClient.Job;
+import java.io.IOException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Produce;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class PutProducerIntegrationTest extends BeanstalkCamelTestSupport {
+ final String testMessage = "Hello, world!";
+
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint resultEndpoint;
+
+ @Produce(uri = "direct:start")
+ protected ProducerTemplate direct;
+
+ @Test
+ public void testPut() throws InterruptedException, IOException {
+ resultEndpoint.expectedMessageCount(1);
+ resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull();
+ direct.sendBody(testMessage);
+
+ resultEndpoint.assertIsSatisfied();
+
+ final Long jobId = resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, Long.class);
+ assertNotNull("Job ID in 'In' message", jobId);
+
+ final Job job = reader.reserve(5);
+ assertNotNull("Beanstalk client got message", job);
+ assertEquals("Job body from the server", testMessage, new String(job.getData()));
+ assertEquals("Job ID from the server", jobId.longValue(), job.getJobId());
+ reader.delete(jobId.longValue());
+ }
+
+ @Test
+ public void testOut() throws InterruptedException, IOException {
+ final Endpoint endpoint = context.getEndpoint("beanstalk:"+tubeName);
+ final Exchange exchange = template.send(endpoint, ExchangePattern.InOut, new Processor() {
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody(testMessage);
+ }
+ });
+
+ final Message out = exchange.getOut();
+ assertNotNull("Out message", out);
+
+ final Long jobId = out.getHeader(Headers.JOB_ID, Long.class);
+ assertNotNull("Job ID in 'Out' message", jobId);
+
+ final Job job = reader.reserve(5);
+ assertNotNull("Beanstalk client got message", job);
+ assertEquals("Job body from the server", testMessage, new String(job.getData()));
+ assertEquals("Job ID from the server", jobId.longValue(), job.getJobId());
+ reader.delete(jobId.longValue());
+ }
+
+ @Test
+ public void testDelay() throws InterruptedException, IOException {
+ final byte[] testBytes = new byte[0];
+
+ resultEndpoint.expectedMessageCount(1);
+ resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull();
+ resultEndpoint.expectedBodiesReceived(testBytes);
+ direct.sendBodyAndHeader(testBytes, Headers.DELAY, 10);
+
+ resultEndpoint.assertIsSatisfied();
+
+ final Long jobId = resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, Long.class);
+ assertNotNull("Job ID in message", jobId);
+
+ final Job job = reader.reserve(0);
+ assertNull("Beanstalk client has no message", job);
+ reader.delete(jobId.longValue());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start").to("beanstalk:"+tubeName+"?jobPriority=1000&jobTimeToRun=5").to("mock:result");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ReleaseProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ReleaseProducerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ReleaseProducerIntegrationTest.java
new file mode 100644
index 0000000..c77dc32
--- /dev/null
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ReleaseProducerIntegrationTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.integration;
+
+import org.apache.camel.component.beanstalk.Headers;
+import com.surftools.BeanstalkClient.Job;
+import java.io.IOException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Ignore;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class ReleaseProducerIntegrationTest extends BeanstalkCamelTestSupport {
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint resultEndpoint;
+
+ @Produce(uri = "direct:start")
+ protected ProducerTemplate direct;
+
+ @Ignore("requires reserve - release sequence")
+ @Test
+ public void testBury() throws InterruptedException, IOException {
+ long jobId = writer.put(0, 0, 5, new byte[0]);
+ assertTrue("Valid Job Id", jobId > 0);
+
+ resultEndpoint.expectedMessageCount(1);
+ resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull();
+ resultEndpoint.allMessages().header(Headers.RESULT).isEqualTo(true);
+ direct.sendBodyAndHeader(null, Headers.JOB_ID, jobId);
+
+ assertMockEndpointsSatisfied();
+
+ final Long messageJobId = resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, Long.class);
+ assertNotNull("Job ID in message", messageJobId);
+ assertEquals("Message Job ID equals", jobId, messageJobId.longValue());
+
+ final Job job = reader.reserve(0);
+ assertNull("Beanstalk client has no message", job);
+
+ final Job buried = reader.peekBuried();
+ assertNotNull("Job in buried", buried);
+ assertEquals("Buried job id", jobId, buried.getJobId());
+ }
+
+ @Test(expected=CamelExecutionException.class)
+ public void testNoJobId() throws InterruptedException, IOException {
+ resultEndpoint.expectedMessageCount(0);
+ direct.sendBody(new byte[0]);
+
+ resultEndpoint.assertIsSatisfied();
+ assertListSize("Number of exceptions", resultEndpoint.getFailures(), 1);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start").to("beanstalk:"+tubeName+"?command=release").to("mock:result");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/TouchProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/TouchProducerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/TouchProducerIntegrationTest.java
new file mode 100644
index 0000000..bfb4c45
--- /dev/null
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/TouchProducerIntegrationTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.integration;
+
+import org.apache.camel.component.beanstalk.Headers;
+import com.surftools.BeanstalkClient.Job;
+import java.io.IOException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Ignore;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TouchProducerIntegrationTest extends BeanstalkCamelTestSupport {
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint resultEndpoint;
+
+ @Produce(uri = "direct:start")
+ protected ProducerTemplate direct;
+
+ @Ignore("requires reserve - touch sequence")
+ @Test
+ public void testBury() throws InterruptedException, IOException {
+ long jobId = writer.put(0, 0, 5, new byte[0]);
+ assertTrue("Valid Job Id", jobId > 0);
+
+ resultEndpoint.expectedMessageCount(1);
+ resultEndpoint.allMessages().header(Headers.JOB_ID).isNotNull();
+ resultEndpoint.allMessages().header(Headers.RESULT).isEqualTo(true);
+ direct.sendBodyAndHeader(null, Headers.JOB_ID, jobId);
+
+ assertMockEndpointsSatisfied();
+
+ final Long messageJobId = resultEndpoint.getReceivedExchanges().get(0).getIn().getHeader(Headers.JOB_ID, Long.class);
+ assertNotNull("Job ID in message", messageJobId);
+ assertEquals("Message Job ID equals", jobId, messageJobId.longValue());
+
+ final Job job = reader.reserve(0);
+ assertNull("Beanstalk client has no message", job);
+
+ final Job buried = reader.peekBuried();
+ assertNotNull("Job in buried", buried);
+ assertEquals("Buried job id", jobId, buried.getJobId());
+ }
+
+ @Test(expected=CamelExecutionException.class)
+ public void testNoJobId() throws InterruptedException, IOException {
+ resultEndpoint.expectedMessageCount(0);
+ direct.sendBody(new byte[0]);
+
+ resultEndpoint.assertIsSatisfied();
+ assertListSize("Number of exceptions", resultEndpoint.getFailures(), 1);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start").to("beanstalk:"+tubeName+"?command=touch").to("mock:result");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/pom.xml
----------------------------------------------------------------------
diff --git a/components/pom.xml b/components/pom.xml
index a53d65f..e185ace 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -64,6 +64,7 @@
<module>camel-aws</module>
<module>camel-base64</module>
<module>camel-beanio</module>
+ <module>camel-beanstalk</module>
<module>camel-bean-validator</module>
<module>camel-barcode</module>
<module>camel-bindy</module>
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index be6230e..e27d215 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -68,6 +68,7 @@
<backport-util-concurrent-version>3.1</backport-util-concurrent-version>
<bcel-bundle-version>5.2_4</bcel-bundle-version>
<beanio-version>2.0.7</beanio-version>
+ <beanstalkd-client-version>1.4.6</beanstalkd-client-version>
<bsh-version>2.0b5</bsh-version>
<!-- bouncycastle 1.50 does not work in OSGi - http://www.bouncycastle.org/jira/browse/BJA-476 -->
<bouncycastle-version>1.49</bouncycastle-version>
[3/8] git commit: Merge branch 'master' of
https://github.com/alaz/camel
Posted by da...@apache.org.
Merge branch 'master' of https://github.com/alaz/camel
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3513c804
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3513c804
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3513c804
Branch: refs/heads/master
Commit: 3513c804b55367f5bd6f6e917a5b617e0c8d08cf
Parents: a496272 a4ff6b6
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Sep 21 08:37:53 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Sep 21 08:37:53 2014 +0200
----------------------------------------------------------------------
components/camel-beanstalk/pom.xml | 88 ++++
components/camel-beanstalk/src/etc/header.txt | 13 +
.../component/beanstalk/BeanstalkComponent.java | 88 ++++
.../component/beanstalk/BeanstalkConsumer.java | 257 ++++++++++++
.../component/beanstalk/BeanstalkEndpoint.java | 125 ++++++
.../beanstalk/BeanstalkExchangeHelper.java | 47 +++
.../component/beanstalk/BeanstalkProducer.java | 130 ++++++
.../component/beanstalk/ConnectionSettings.java | 132 ++++++
.../beanstalk/ConnectionSettingsFactory.java | 43 ++
.../camel/component/beanstalk/Headers.java | 46 ++
.../beanstalk/processors/BuryCommand.java | 49 +++
.../component/beanstalk/processors/Command.java | 24 ++
.../beanstalk/processors/DefaultCommand.java | 45 ++
.../beanstalk/processors/DeleteCommand.java | 46 ++
.../beanstalk/processors/KickCommand.java | 45 ++
.../beanstalk/processors/PutCommand.java | 50 +++
.../beanstalk/processors/ReleaseCommand.java | 52 +++
.../beanstalk/processors/TouchCommand.java | 46 ++
.../src/main/resources/META-INF/LICENSE.txt | 203 +++++++++
.../src/main/resources/META-INF/NOTICE.txt | 11 +
.../org/apache/camel/component/beanstalk | 18 +
.../beanstalk/AwaitingConsumerTest.java | 88 ++++
.../beanstalk/BeanstalkMockTestSupport.java | 45 ++
.../beanstalk/ConnectionSettingsTest.java | 53 +++
.../beanstalk/ConsumerCompletionTest.java | 118 ++++++
.../camel/component/beanstalk/EndpointTest.java | 90 ++++
.../camel/component/beanstalk/Helper.java | 81 ++++
.../beanstalk/ImmediateConsumerTest.java | 93 ++++
.../camel/component/beanstalk/ProducerTest.java | 419 +++++++++++++++++++
.../integration/BeanstalkCamelTestSupport.java | 41 ++
.../BuryProducerIntegrationTest.java | 82 ++++
.../integration/ConsumerIntegrationTest.java | 66 +++
.../DeleteProducerIntegrationTest.java | 76 ++++
.../integration/PutProducerIntegrationTest.java | 112 +++++
.../ReleaseProducerIntegrationTest.java | 82 ++++
.../TouchProducerIntegrationTest.java | 82 ++++
components/pom.xml | 1 +
parent/pom.xml | 1 +
38 files changed, 3088 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3513c804/parent/pom.xml
----------------------------------------------------------------------
[8/8] git commit: Added beanstalk to features
Posted by da...@apache.org.
Added beanstalk to features
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4233318d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4233318d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4233318d
Branch: refs/heads/master
Commit: 4233318d90aab323b8da7cc652dc67c5d8d4c760
Parents: d636106
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Sep 21 09:53:42 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Sep 21 09:56:06 2014 +0200
----------------------------------------------------------------------
.../features/src/main/resources/features.xml | 5 +++
.../camel/itest/karaf/CamelBeanstalkTest.java | 40 ++++++++++++++++++++
2 files changed, 45 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/4233318d/platforms/karaf/features/src/main/resources/features.xml
----------------------------------------------------------------------
diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml
index bb66582..9ddf53b 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -163,6 +163,11 @@
<bundle dependency='true'>mvn:org.beanio/beanio/${beanio-version}</bundle>
<bundle>mvn:org.apache.camel/camel-beanio/${project.version}</bundle>
</feature>
+ <feature name='camel-beanstalk' version='${project.version}' resolver='(obr)' start-level='50'>
+ <feature version='${project.version}'>camel-core</feature>
+ <bundle dependency='true'>wrap:mvn:com.surftools/BeanstalkClient/${beanstalkd-client-version}</bundle>
+ <bundle>mvn:org.apache.camel/camel-beanstalk/${project.version}</bundle>
+ </feature>
<feature name='camel-barcode' version='${project.version}' resolver='(obr)' start-level='50'>
<feature version='${project.version}'>camel-core</feature>
<bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.zxing/${zxing-bundle-version}</bundle>
http://git-wip-us.apache.org/repos/asf/camel/blob/4233318d/tests/camel-itest-karaf/src/test/java/org/apache/camel/itest/karaf/CamelBeanstalkTest.java
----------------------------------------------------------------------
diff --git a/tests/camel-itest-karaf/src/test/java/org/apache/camel/itest/karaf/CamelBeanstalkTest.java b/tests/camel-itest-karaf/src/test/java/org/apache/camel/itest/karaf/CamelBeanstalkTest.java
new file mode 100644
index 0000000..7a79ba5
--- /dev/null
+++ b/tests/camel-itest-karaf/src/test/java/org/apache/camel/itest/karaf/CamelBeanstalkTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.karaf;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+
+@RunWith(PaxExam.class)
+public class CamelBeanstalkTest extends AbstractFeatureTest {
+
+ public static final String COMPONENT = extractName(CamelBeanstalkTest.class);
+
+ @Test
+ public void test() throws Exception {
+ testComponent(COMPONENT);
+ }
+
+ @Configuration
+ public static Option[] configure() {
+ return configure(COMPONENT);
+ }
+
+}
\ No newline at end of file
[5/8] git commit: Polished
Posted by da...@apache.org.
Polished
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e12a6318
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e12a6318
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e12a6318
Branch: refs/heads/master
Commit: e12a63188d4400b8c7cb4350fd2a54684d7bf034
Parents: 12cc832
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Sep 21 09:08:00 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Sep 21 09:08:00 2014 +0200
----------------------------------------------------------------------
components/camel-google-drive/pom.xml | 2 +-
components/camel-http4/pom.xml | 2 +-
components/camel-schematron/pom.xml | 7 ++++++-
components/camel-test-blueprint/pom.xml | 2 +-
examples/pom.xml | 2 +-
5 files changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e12a6318/components/camel-google-drive/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-google-drive/pom.xml b/components/camel-google-drive/pom.xml
index 8955b78..7782bb8 100644
--- a/components/camel-google-drive/pom.xml
+++ b/components/camel-google-drive/pom.xml
@@ -27,7 +27,7 @@
<artifactId>camel-google-drive</artifactId>
<packaging>bundle</packaging>
- <name>Camel GoogleDrive Component</name>
+ <name>Camel :: GoogleDrive</name>
<description>Camel Component for GoogleDrive</description>
<properties>
http://git-wip-us.apache.org/repos/asf/camel/blob/e12a6318/components/camel-http4/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-http4/pom.xml b/components/camel-http4/pom.xml
index b30ceeb..b0fa7f4 100644
--- a/components/camel-http4/pom.xml
+++ b/components/camel-http4/pom.xml
@@ -26,7 +26,7 @@
<artifactId>camel-http4</artifactId>
<packaging>bundle</packaging>
- <name>Camel :: HTTP4 (HttpClient 4.x)</name>
+ <name>Camel :: HTTP4</name>
<description>Camel HTTP (Apache HttpClient 4.x) support</description>
<properties>
http://git-wip-us.apache.org/repos/asf/camel/blob/e12a6318/components/camel-schematron/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-schematron/pom.xml b/components/camel-schematron/pom.xml
index d375cd1..9e30736 100644
--- a/components/camel-schematron/pom.xml
+++ b/components/camel-schematron/pom.xml
@@ -23,15 +23,19 @@
<artifactId>components</artifactId>
<version>2.15-SNAPSHOT</version>
</parent>
+
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.camel</groupId>
<artifactId>camel-schematron</artifactId>
<packaging>bundle</packaging>
- <name>Camel :: SCHEMATRON</name>
+ <name>Camel :: Schematron</name>
+ <description>Camel Schematron support</description>
+
<properties>
<camel.osgi.export.pkg>org.apache.camel.component.schematron.*</camel.osgi.export.pkg>
<camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=schematron</camel.osgi.export.service>
</properties>
+
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
@@ -66,4 +70,5 @@
<scope>test</scope>
</dependency>
</dependencies>
+
</project>
http://git-wip-us.apache.org/repos/asf/camel/blob/e12a6318/components/camel-test-blueprint/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-test-blueprint/pom.xml b/components/camel-test-blueprint/pom.xml
index 5990725..35abb0e 100644
--- a/components/camel-test-blueprint/pom.xml
+++ b/components/camel-test-blueprint/pom.xml
@@ -26,7 +26,7 @@
<artifactId>camel-test-blueprint</artifactId>
<packaging>bundle</packaging>
- <name>Camel :: Test Blueprint</name>
+ <name>Camel :: Test :: Blueprint</name>
<description>Camel Testing Blueprint Library using JUnit</description>
<properties>
http://git-wip-us.apache.org/repos/asf/camel/blob/e12a6318/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 44c3e48..9f1451b 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -26,7 +26,7 @@
</parent>
<artifactId>examples</artifactId>
- <name>Camel :: Examples</name>
+ <name>Camel :: Example</name>
<description>Camel Examples</description>
<packaging>pom</packaging>
[4/8] git commit: Added camel-beanstalk to kit
Posted by da...@apache.org.
Added camel-beanstalk to kit
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/12cc832e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/12cc832e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/12cc832e
Branch: refs/heads/master
Commit: 12cc832e9fc580920a8a8371f6c48026abd67197
Parents: 3513c80
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Sep 21 09:04:17 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Sep 21 09:04:17 2014 +0200
----------------------------------------------------------------------
apache-camel/pom.xml | 4 ++++
apache-camel/src/main/descriptors/common-bin.xml | 1 +
parent/pom.xml | 5 +++++
3 files changed, 10 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/12cc832e/apache-camel/pom.xml
----------------------------------------------------------------------
diff --git a/apache-camel/pom.xml b/apache-camel/pom.xml
index 340bd5e..da3c267 100644
--- a/apache-camel/pom.xml
+++ b/apache-camel/pom.xml
@@ -97,6 +97,10 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-beanstalk</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-bean-validator</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/camel/blob/12cc832e/apache-camel/src/main/descriptors/common-bin.xml
----------------------------------------------------------------------
diff --git a/apache-camel/src/main/descriptors/common-bin.xml b/apache-camel/src/main/descriptors/common-bin.xml
index e2108e4..6ad20f6 100644
--- a/apache-camel/src/main/descriptors/common-bin.xml
+++ b/apache-camel/src/main/descriptors/common-bin.xml
@@ -36,6 +36,7 @@
<include>org.apache.camel:camel-barcode</include>
<include>org.apache.camel:camel-base64</include>
<include>org.apache.camel:camel-beanio</include>
+ <include>org.apache.camel:camel-beanstalk</include>
<include>org.apache.camel:camel-bean-validator</include>
<include>org.apache.camel:camel-bindy</include>
<include>org.apache.camel:camel-box</include>
http://git-wip-us.apache.org/repos/asf/camel/blob/12cc832e/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 80e1293..19eff62 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -594,6 +594,11 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-beanstalk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-bean-validator</artifactId>
<version>${project.version}</version>
</dependency>
[6/8] Fixed CS
Posted by da...@apache.org.
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BeanstalkCamelTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BeanstalkCamelTestSupport.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BeanstalkCamelTestSupport.java
index fb87c9a..6087c80 100644
--- a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BeanstalkCamelTestSupport.java
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BeanstalkCamelTestSupport.java
@@ -16,18 +16,17 @@
*/
package org.apache.camel.component.beanstalk.integration;
+import com.surftools.BeanstalkClient.Client;
import org.apache.camel.component.beanstalk.ConnectionSettings;
import org.apache.camel.component.beanstalk.ConnectionSettingsFactory;
-import com.surftools.BeanstalkClient.Client;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Before;
public abstract class BeanstalkCamelTestSupport extends CamelTestSupport {
- final ConnectionSettingsFactory connFactory = ConnectionSettingsFactory.DEFAULT;
- final String tubeName = String.format("test%d", System.currentTimeMillis());
-
- protected Client reader = null;
- protected Client writer = null;
+ protected final ConnectionSettingsFactory connFactory = ConnectionSettingsFactory.DEFAULT;
+ protected final String tubeName = String.format("test%d", System.currentTimeMillis());
+ protected Client reader;
+ protected Client writer;
@Before
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BuryProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BuryProducerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BuryProducerIntegrationTest.java
index 033955e..7223c1a 100644
--- a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BuryProducerIntegrationTest.java
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/BuryProducerIntegrationTest.java
@@ -16,18 +16,18 @@
*/
package org.apache.camel.component.beanstalk.integration;
-import org.apache.camel.component.beanstalk.Headers;
-import com.surftools.BeanstalkClient.Job;
import java.io.IOException;
+
+import com.surftools.BeanstalkClient.Job;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.EndpointInject;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.beanstalk.Headers;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Ignore;
import org.junit.Test;
-import static org.junit.Assert.*;
public class BuryProducerIntegrationTest extends BeanstalkCamelTestSupport {
@EndpointInject(uri = "mock:result")
@@ -61,7 +61,7 @@ public class BuryProducerIntegrationTest extends BeanstalkCamelTestSupport {
assertEquals("Buried job id", jobId, buried.getJobId());
}
- @Test(expected=CamelExecutionException.class)
+ @Test(expected = CamelExecutionException.class)
public void testNoJobId() throws InterruptedException, IOException {
resultEndpoint.expectedMessageCount(0);
direct.sendBody(new byte[0]);
@@ -75,7 +75,7 @@ public class BuryProducerIntegrationTest extends BeanstalkCamelTestSupport {
return new RouteBuilder() {
@Override
public void configure() {
- from("direct:start").to("beanstalk:"+tubeName+"?command=bury").to("mock:result");
+ from("direct:start").to("beanstalk:" + tubeName + "?command=bury").to("mock:result");
}
};
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ConsumerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ConsumerIntegrationTest.java
index 6e2b4de..c0f37df 100644
--- a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ConsumerIntegrationTest.java
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ConsumerIntegrationTest.java
@@ -16,12 +16,13 @@
*/
package org.apache.camel.component.beanstalk.integration;
-import org.apache.camel.component.beanstalk.Headers;
-import org.apache.camel.component.beanstalk.Helper;
import java.io.IOException;
+
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.beanstalk.Headers;
+import org.apache.camel.component.beanstalk.Helper;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Test;
@@ -33,15 +34,15 @@ public class ConsumerIntegrationTest extends BeanstalkCamelTestSupport {
@Test
public void testReceive() throws IOException, InterruptedException {
- long PRIO = 0;
- int TTR = 10;
- final long jobId = writer.put(PRIO, 0, TTR, Helper.stringToBytes(testMessage));
+ long prio = 0;
+ int ttr = 10;
+ final long jobId = writer.put(prio, 0, ttr, Helper.stringToBytes(testMessage));
result.expectedMessageCount(1);
result.expectedPropertyReceived(Headers.JOB_ID, jobId);
result.message(0).header(Exchange.CREATED_TIMESTAMP).isNotNull();
- result.message(0).header(Headers.JOB_ID).isEqualTo(Long.valueOf(jobId));
- result.message(0).header(Headers.PRIORITY).isEqualTo(Long.valueOf(PRIO));
+ result.message(0).header(Headers.JOB_ID).isEqualTo(jobId);
+ result.message(0).header(Headers.PRIORITY).isEqualTo(prio);
result.message(0).header(Headers.TUBE).isEqualTo(tubeName);
result.message(0).header(Headers.STATE).isEqualTo("reserved");
result.message(0).header(Headers.AGE).isGreaterThan(0);
@@ -59,7 +60,7 @@ public class ConsumerIntegrationTest extends BeanstalkCamelTestSupport {
return new RouteBuilder() {
@Override
public void configure() {
- from("beanstalk:"+tubeName).to("mock:result");
+ from("beanstalk:" + tubeName).to("mock:result");
}
};
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/DeleteProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/DeleteProducerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/DeleteProducerIntegrationTest.java
index 9d9dc36..07135d1 100644
--- a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/DeleteProducerIntegrationTest.java
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/DeleteProducerIntegrationTest.java
@@ -16,17 +16,17 @@
*/
package org.apache.camel.component.beanstalk.integration;
-import org.apache.camel.component.beanstalk.Headers;
-import com.surftools.BeanstalkClient.Job;
import java.io.IOException;
+
+import com.surftools.BeanstalkClient.Job;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.EndpointInject;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.beanstalk.Headers;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Test;
-import static org.junit.Assert.*;
public class DeleteProducerIntegrationTest extends BeanstalkCamelTestSupport {
@EndpointInject(uri = "mock:result")
@@ -55,7 +55,7 @@ public class DeleteProducerIntegrationTest extends BeanstalkCamelTestSupport {
assertNull("Job has been deleted", job);
}
- @Test(expected=CamelExecutionException.class)
+ @Test(expected = CamelExecutionException.class)
public void testNoJobId() throws InterruptedException, IOException {
resultEndpoint.expectedMessageCount(0);
direct.sendBody(new byte[0]);
@@ -69,7 +69,7 @@ public class DeleteProducerIntegrationTest extends BeanstalkCamelTestSupport {
return new RouteBuilder() {
@Override
public void configure() {
- from("direct:start").to("beanstalk:"+tubeName+"?command=delete").to("mock:result");
+ from("direct:start").to("beanstalk:" + tubeName + "?command=delete").to("mock:result");
}
};
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/PutProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/PutProducerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/PutProducerIntegrationTest.java
index ff96d3e..ec51603 100644
--- a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/PutProducerIntegrationTest.java
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/PutProducerIntegrationTest.java
@@ -16,24 +16,23 @@
*/
package org.apache.camel.component.beanstalk.integration;
-import org.apache.camel.component.beanstalk.Headers;
-import com.surftools.BeanstalkClient.Job;
import java.io.IOException;
-import org.apache.camel.EndpointInject;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Produce;
+
+import com.surftools.BeanstalkClient.Job;
import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.beanstalk.Headers;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Test;
-import static org.junit.Assert.*;
public class PutProducerIntegrationTest extends BeanstalkCamelTestSupport {
- final String testMessage = "Hello, world!";
@EndpointInject(uri = "mock:result")
protected MockEndpoint resultEndpoint;
@@ -41,6 +40,8 @@ public class PutProducerIntegrationTest extends BeanstalkCamelTestSupport {
@Produce(uri = "direct:start")
protected ProducerTemplate direct;
+ private String testMessage = "Hello, world!";
+
@Test
public void testPut() throws InterruptedException, IOException {
resultEndpoint.expectedMessageCount(1);
@@ -56,12 +57,12 @@ public class PutProducerIntegrationTest extends BeanstalkCamelTestSupport {
assertNotNull("Beanstalk client got message", job);
assertEquals("Job body from the server", testMessage, new String(job.getData()));
assertEquals("Job ID from the server", jobId.longValue(), job.getJobId());
- reader.delete(jobId.longValue());
+ reader.delete(jobId);
}
@Test
public void testOut() throws InterruptedException, IOException {
- final Endpoint endpoint = context.getEndpoint("beanstalk:"+tubeName);
+ final Endpoint endpoint = context.getEndpoint("beanstalk:" + tubeName);
final Exchange exchange = template.send(endpoint, ExchangePattern.InOut, new Processor() {
public void process(Exchange exchange) {
exchange.getIn().setBody(testMessage);
@@ -78,7 +79,7 @@ public class PutProducerIntegrationTest extends BeanstalkCamelTestSupport {
assertNotNull("Beanstalk client got message", job);
assertEquals("Job body from the server", testMessage, new String(job.getData()));
assertEquals("Job ID from the server", jobId.longValue(), job.getJobId());
- reader.delete(jobId.longValue());
+ reader.delete(jobId);
}
@Test
@@ -97,7 +98,7 @@ public class PutProducerIntegrationTest extends BeanstalkCamelTestSupport {
final Job job = reader.reserve(0);
assertNull("Beanstalk client has no message", job);
- reader.delete(jobId.longValue());
+ reader.delete(jobId);
}
@Override
@@ -105,7 +106,7 @@ public class PutProducerIntegrationTest extends BeanstalkCamelTestSupport {
return new RouteBuilder() {
@Override
public void configure() {
- from("direct:start").to("beanstalk:"+tubeName+"?jobPriority=1000&jobTimeToRun=5").to("mock:result");
+ from("direct:start").to("beanstalk:" + tubeName + "?jobPriority=1000&jobTimeToRun=5").to("mock:result");
}
};
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ReleaseProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ReleaseProducerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ReleaseProducerIntegrationTest.java
index c77dc32..2918037 100644
--- a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ReleaseProducerIntegrationTest.java
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/ReleaseProducerIntegrationTest.java
@@ -16,18 +16,18 @@
*/
package org.apache.camel.component.beanstalk.integration;
-import org.apache.camel.component.beanstalk.Headers;
-import com.surftools.BeanstalkClient.Job;
import java.io.IOException;
+
+import com.surftools.BeanstalkClient.Job;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.EndpointInject;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.beanstalk.Headers;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Ignore;
import org.junit.Test;
-import static org.junit.Assert.*;
public class ReleaseProducerIntegrationTest extends BeanstalkCamelTestSupport {
@EndpointInject(uri = "mock:result")
@@ -61,7 +61,7 @@ public class ReleaseProducerIntegrationTest extends BeanstalkCamelTestSupport {
assertEquals("Buried job id", jobId, buried.getJobId());
}
- @Test(expected=CamelExecutionException.class)
+ @Test(expected = CamelExecutionException.class)
public void testNoJobId() throws InterruptedException, IOException {
resultEndpoint.expectedMessageCount(0);
direct.sendBody(new byte[0]);
@@ -75,7 +75,7 @@ public class ReleaseProducerIntegrationTest extends BeanstalkCamelTestSupport {
return new RouteBuilder() {
@Override
public void configure() {
- from("direct:start").to("beanstalk:"+tubeName+"?command=release").to("mock:result");
+ from("direct:start").to("beanstalk:" + tubeName + "?command=release").to("mock:result");
}
};
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/TouchProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/TouchProducerIntegrationTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/TouchProducerIntegrationTest.java
index bfb4c45..aa352e3 100644
--- a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/TouchProducerIntegrationTest.java
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/integration/TouchProducerIntegrationTest.java
@@ -16,18 +16,18 @@
*/
package org.apache.camel.component.beanstalk.integration;
-import org.apache.camel.component.beanstalk.Headers;
-import com.surftools.BeanstalkClient.Job;
import java.io.IOException;
+
+import com.surftools.BeanstalkClient.Job;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.EndpointInject;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.beanstalk.Headers;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Ignore;
import org.junit.Test;
-import static org.junit.Assert.*;
public class TouchProducerIntegrationTest extends BeanstalkCamelTestSupport {
@EndpointInject(uri = "mock:result")
@@ -61,7 +61,7 @@ public class TouchProducerIntegrationTest extends BeanstalkCamelTestSupport {
assertEquals("Buried job id", jobId, buried.getJobId());
}
- @Test(expected=CamelExecutionException.class)
+ @Test(expected = CamelExecutionException.class)
public void testNoJobId() throws InterruptedException, IOException {
resultEndpoint.expectedMessageCount(0);
direct.sendBody(new byte[0]);
@@ -75,7 +75,7 @@ public class TouchProducerIntegrationTest extends BeanstalkCamelTestSupport {
return new RouteBuilder() {
@Override
public void configure() {
- from("direct:start").to("beanstalk:"+tubeName+"?command=touch").to("mock:result");
+ from("direct:start").to("beanstalk:" + tubeName + "?command=touch").to("mock:result");
}
};
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/resources/log4j.properties b/components/camel-beanstalk/src/test/resources/log4j.properties
new file mode 100644
index 0000000..d6a09a1
--- /dev/null
+++ b/components/camel-beanstalk/src/test/resources/log4j.properties
@@ -0,0 +1,35 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=INFO, file
+
+#log4j.logger.org.apache.camel.dataformat.beanio=DEBUG
+
+# CONSOLE appender not used by default
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d %-5p %c{1} - %m %n
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d %-5p %c{1} - %m %n
+log4j.appender.file.file=target/camel-beanstalk-test.log
+
[7/8] git commit: Fixed CS
Posted by da...@apache.org.
Fixed CS
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d6361062
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d6361062
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d6361062
Branch: refs/heads/master
Commit: d6361062ea58dc5f3c1b7e0684064d164bdc2538
Parents: e12a631
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Sep 21 09:49:45 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Sep 21 09:49:45 2014 +0200
----------------------------------------------------------------------
components/camel-beanstalk/pom.xml | 13 +-
components/camel-beanstalk/src/etc/header.txt | 13 --
.../component/beanstalk/BeanstalkComponent.java | 46 ++++---
.../component/beanstalk/BeanstalkConsumer.java | 131 ++++++++++---------
.../component/beanstalk/BeanstalkEndpoint.java | 82 ++++++------
.../beanstalk/BeanstalkExchangeHelper.java | 17 +--
.../component/beanstalk/BeanstalkProducer.java | 40 +++---
.../component/beanstalk/ConnectionSettings.java | 28 ++--
.../beanstalk/ConnectionSettingsFactory.java | 17 +--
.../camel/component/beanstalk/Headers.java | 37 +++---
.../beanstalk/processors/BuryCommand.java | 20 +--
.../component/beanstalk/processors/Command.java | 3 +-
.../beanstalk/processors/DefaultCommand.java | 2 +-
.../beanstalk/processors/DeleteCommand.java | 19 +--
.../beanstalk/processors/KickCommand.java | 9 +-
.../beanstalk/processors/PutCommand.java | 13 +-
.../beanstalk/processors/ReleaseCommand.java | 19 +--
.../beanstalk/processors/TouchCommand.java | 17 +--
.../beanstalk/AwaitingConsumerTest.java | 23 ++--
.../beanstalk/BeanstalkMockTestSupport.java | 6 +-
.../beanstalk/ConnectionSettingsTest.java | 25 ++--
.../beanstalk/ConsumerCompletionTest.java | 20 ++-
.../camel/component/beanstalk/EndpointTest.java | 16 ++-
.../camel/component/beanstalk/Helper.java | 7 +-
.../beanstalk/ImmediateConsumerTest.java | 22 ++--
.../camel/component/beanstalk/ProducerTest.java | 69 ++++++----
.../integration/BeanstalkCamelTestSupport.java | 11 +-
.../BuryProducerIntegrationTest.java | 10 +-
.../integration/ConsumerIntegrationTest.java | 17 +--
.../DeleteProducerIntegrationTest.java | 10 +-
.../integration/PutProducerIntegrationTest.java | 25 ++--
.../ReleaseProducerIntegrationTest.java | 10 +-
.../TouchProducerIntegrationTest.java | 10 +-
.../src/test/resources/log4j.properties | 35 +++++
34 files changed, 471 insertions(+), 371 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/pom.xml b/components/camel-beanstalk/pom.xml
index 1716fc0..f44256a 100644
--- a/components/camel-beanstalk/pom.xml
+++ b/components/camel-beanstalk/pom.xml
@@ -17,7 +17,8 @@
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -29,7 +30,6 @@
<artifactId>camel-beanstalk</artifactId>
<packaging>bundle</packaging>
<name>Camel :: Beanstalk</name>
-
<description>Camel Beanstalk component</description>
<properties>
@@ -66,6 +66,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
@@ -76,13 +81,13 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <version>2.14</version>
<configuration>
- <excludes />
+ <excludes/>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
+
</project>
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/etc/header.txt
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/etc/header.txt b/components/camel-beanstalk/src/etc/header.txt
deleted file mode 100644
index 4091cb9..0000000
--- a/components/camel-beanstalk/src/etc/header.txt
+++ /dev/null
@@ -1,13 +0,0 @@
-Copyright (C) ${year} ${author} <${email}>
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkComponent.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkComponent.java
index 0040697..ff59ed5 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkComponent.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkComponent.java
@@ -17,15 +17,16 @@
package org.apache.camel.component.beanstalk;
import java.util.Map;
+
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;
/**
* Beanstalk Camel component.
- *
+ * <p/>
* URI is <code>beanstalk://[host[:port]][/tube]?query</code>
- * <p>
+ * <p/>
* Parameters:<ul>
* <li><code>command</code> - one of "put", "release", "bury", "touch", "delete", "kick".
* "put" is the default for Producers.</li>
@@ -36,25 +37,24 @@ import org.apache.camel.impl.DefaultComponent;
* <li><code>consumer.awaitJob</code></li>
* </ul>
*
- * @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
* @see BeanstalkEndpoint
* @see ConnectionSettingsFactory
*/
public class BeanstalkComponent extends DefaultComponent {
- public static final String DEFAULT_TUBE = "default";
+ public static final String DEFAULT_TUBE = "default";
- public final static String COMMAND_BURY = "bury";
- public final static String COMMAND_RELEASE = "release";
- public final static String COMMAND_PUT = "put";
- public final static String COMMAND_TOUCH = "touch";
- public final static String COMMAND_DELETE = "delete";
- public final static String COMMAND_KICK = "kick";
+ public static final String COMMAND_BURY = "bury";
+ public static final String COMMAND_RELEASE = "release";
+ public static final String COMMAND_PUT = "put";
+ public static final String COMMAND_TOUCH = "touch";
+ public static final String COMMAND_DELETE = "delete";
+ public static final String COMMAND_KICK = "kick";
- public final static long DEFAULT_PRIORITY = 1000; // 0 is highest
- public final static int DEFAULT_DELAY = 0;
- public final static int DEFAULT_TIME_TO_RUN = 60; // if 0 the daemon sets 1.
+ public static final long DEFAULT_PRIORITY = 1000; // 0 is highest
+ public static final int DEFAULT_DELAY = 0;
+ public static final int DEFAULT_TIME_TO_RUN = 60; // if 0 the daemon sets 1.
- static ConnectionSettingsFactory connFactory = ConnectionSettingsFactory.DEFAULT;
+ private static ConnectionSettingsFactory connectionSettingsFactory = ConnectionSettingsFactory.DEFAULT;
public BeanstalkComponent() {
}
@@ -69,20 +69,24 @@ public class BeanstalkComponent extends DefaultComponent {
}
@Override
- protected Endpoint createEndpoint(final String uri, final String remaining, final Map<String,Object> parameters) throws Exception {
- return new BeanstalkEndpoint(uri, this, connFactory.parseUri(remaining));
+ protected Endpoint createEndpoint(final String uri, final String remaining, final Map<String, Object> parameters) throws Exception {
+ return new BeanstalkEndpoint(uri, this, connectionSettingsFactory.parseUri(remaining));
}
/**
- * Custom ConnectionSettingsFactory.
- * <p>
+ * Custom {@link ConnectionSettingsFactory}.
+ * <p/>
* Specify which {@link ConnectionSettingsFactory} to use to make connections to Beanstalkd. Especially
* useful for unit testing without beanstalkd daemon (you can mock {@link ConnectionSettings})
- *
- * @param connFactory
+ *
+ * @param connFactory the connection factory
* @see ConnectionSettingsFactory
*/
public static void setConnectionSettingsFactory(ConnectionSettingsFactory connFactory) {
- BeanstalkComponent.connFactory = connFactory;
+ BeanstalkComponent.connectionSettingsFactory = connFactory;
+ }
+
+ public static ConnectionSettingsFactory getConnectionSettingsFactory() {
+ return connectionSettingsFactory;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java
index 9c16f7d..ac0e058 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java
@@ -16,109 +16,116 @@
*/
package org.apache.camel.component.beanstalk;
-import org.apache.camel.component.beanstalk.processors.*;
-import com.surftools.BeanstalkClient.BeanstalkException;
-import com.surftools.BeanstalkClient.Client;
-import com.surftools.BeanstalkClient.Job;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+
+import com.surftools.BeanstalkClient.BeanstalkException;
+import com.surftools.BeanstalkClient.Client;
+import com.surftools.BeanstalkClient.Job;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
-import org.apache.camel.spi.Synchronization;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.beanstalk.processors.BuryCommand;
+import org.apache.camel.component.beanstalk.processors.Command;
+import org.apache.camel.component.beanstalk.processors.DeleteCommand;
+import org.apache.camel.component.beanstalk.processors.ReleaseCommand;
import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.spi.Synchronization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* PollingConsumer to read Beanstalk jobs.
- *
+ * <p/>
* The consumer may delete the job immediately or based on successful {@link Exchange}
* completion. The behavior is configurable by <code>consumer.awaitJob</code>
* flag (by default <code>true</code>)
- *
+ * <p/>
* This consumer will add a {@link Synchronization} object to every {@link Exchange}
* object it creates in order to react on successful exchange completion or failure.
- *
+ * <p/>
* In the case of successful completion, Beanstalk's <code>delete</code> method is
* called upon the job. In the case of failure the default reaction is to call
* <code>bury</code>.
- *
+ * <p/>
* The reaction on failures is configurable: possible variants are "bury", "release" or "delete"
- *
- * @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
*/
public class BeanstalkConsumer extends ScheduledPollConsumer {
- private final transient Logger log = LoggerFactory.getLogger(getClass());
-
- String onFailure = BeanstalkComponent.COMMAND_BURY;
- boolean useBlockIO = true;
- boolean deleteImmediately = false;
+ private static final Logger LOG = LoggerFactory.getLogger(BeanstalkConsumer.class);
+ private static final String[] STATS_KEY_STR = new String[]{"tube", "state"};
+ private static final String[] STATS_KEY_INT = new String[]{"age", "time-left", "timeouts", "releases", "buries", "kicks"};
- private Client client = null;
- private ExecutorService executor = null;
- private Synchronization sync = null;
-
- private static String[] statsKeysStr = new String[] {"tube", "state"};
- private static String[] statsKeysInt = new String[] {"age", "time-left", "timeouts", "releases", "buries", "kicks"};
+ private String onFailure = BeanstalkComponent.COMMAND_BURY;
+ private boolean useBlockIO = true;
+ private boolean deleteImmediately;
+ private Client client;
+ private ExecutorService executor;
+ private Synchronization sync;
private final Runnable initTask = new Runnable() {
- @Override
- public void run() {
- client = getEndpoint().getConnection().newReadingClient(useBlockIO);
- }
- };
+ @Override
+ public void run() {
+ client = getEndpoint().getConnection().newReadingClient(useBlockIO);
+ }
+ };
+
private final Callable<Exchange> pollTask = new Callable<Exchange>() {
- final Integer NO_WAIT = Integer.valueOf(0);
-
+ final Integer noWait = 0;
+
@Override
public Exchange call() throws Exception {
- if (client == null)
+ if (client == null) {
throw new RuntimeCamelException("Beanstalk client not initialized");
+ }
try {
- final Job job = client.reserve(NO_WAIT);
- if (job == null)
+ final Job job = client.reserve(noWait);
+ if (job == null) {
return null;
+ }
- if (log.isDebugEnabled())
- log.debug(String.format("Received job ID %d (data length %d)", job.getJobId(), job.getData().length));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Received job ID %d (data length %d)", job.getJobId(), job.getData().length));
+ }
final Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOnly);
exchange.setProperty(Headers.JOB_ID, job.getJobId());
exchange.getIn().setBody(job.getData(), byte[].class);
- Map<String,String> jobStats = client.statsJob(job.getJobId());
+ Map<String, String> jobStats = client.statsJob(job.getJobId());
if (jobStats != null) {
- for (String key : statsKeysStr) {
- if (jobStats.containsKey(key))
- exchange.setProperty(Headers.PREFIX+key, jobStats.get(key).trim());
+ for (String key : STATS_KEY_STR) {
+ if (jobStats.containsKey(key)) {
+ exchange.setProperty(Headers.PREFIX + key, jobStats.get(key).trim());
+ }
}
- if (jobStats.containsKey("pri"))
+ if (jobStats.containsKey("pri")) {
exchange.setProperty(Headers.PRIORITY, Long.parseLong(jobStats.get("pri").trim()));
+ }
- for (String key : statsKeysInt) {
- if (jobStats.containsKey(key))
- exchange.setProperty(Headers.PREFIX+key, Integer.parseInt(jobStats.get(key).trim()));
+ for (String key : STATS_KEY_INT) {
+ if (jobStats.containsKey(key)) {
+ exchange.setProperty(Headers.PREFIX + key, Integer.parseInt(jobStats.get(key).trim()));
+ }
}
}
- if (deleteImmediately)
+ if (deleteImmediately) {
client.delete(job.getJobId());
- else
+ } else {
exchange.addOnCompletion(sync);
+ }
return exchange;
} catch (BeanstalkException e) {
- log.error("Beanstalk client error", e);
+ getExceptionHandler().handleException("Beanstalk client error", e);
resetClient();
return null;
}
}
-
};
public BeanstalkConsumer(final BeanstalkEndpoint endpoint, final Processor processor) {
@@ -130,8 +137,9 @@ public class BeanstalkConsumer extends ScheduledPollConsumer {
int messagesPolled = 0;
while (isPollAllowed()) {
final Exchange exchange = executor.submit(pollTask).get();
- if (exchange == null)
+ if (exchange == null) {
break;
+ }
++messagesPolled;
getProcessor().process(exchange);
@@ -179,13 +187,15 @@ public class BeanstalkConsumer extends ScheduledPollConsumer {
@Override
protected void doStop() throws Exception {
super.doStop();
- if (executor != null)
- executor.shutdown();
+ if (executor != null) {
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(executor);
+ }
}
protected void resetClient() {
- if (client != null)
+ if (client != null) {
client.close();
+ }
initTask.run();
}
@@ -196,14 +206,15 @@ public class BeanstalkConsumer extends ScheduledPollConsumer {
public Sync() {
successCommand = new DeleteCommand(getEndpoint());
- if (BeanstalkComponent.COMMAND_BURY.equals(onFailure))
+ if (BeanstalkComponent.COMMAND_BURY.equals(onFailure)) {
failureCommand = new BuryCommand(getEndpoint());
- else if (BeanstalkComponent.COMMAND_RELEASE.equals(onFailure))
+ } else if (BeanstalkComponent.COMMAND_RELEASE.equals(onFailure)) {
failureCommand = new ReleaseCommand(getEndpoint());
- else if (BeanstalkComponent.COMMAND_DELETE.equals(onFailure))
+ } else if (BeanstalkComponent.COMMAND_DELETE.equals(onFailure)) {
failureCommand = new DeleteCommand(getEndpoint());
- else
+ } else {
throw new IllegalArgumentException(String.format("Unknown failure command: %s", onFailure));
+ }
}
@Override
@@ -211,8 +222,7 @@ public class BeanstalkConsumer extends ScheduledPollConsumer {
try {
executor.submit(new RunCommand(successCommand, exchange)).get();
} catch (Exception e) {
- if (log.isErrorEnabled())
- log.error(String.format("Could not run completion of exchange %s", exchange), e);
+ LOG.error(String.format("Could not run completion of exchange %s", exchange), e);
}
}
@@ -221,8 +231,7 @@ public class BeanstalkConsumer extends ScheduledPollConsumer {
try {
executor.submit(new RunCommand(failureCommand, exchange)).get();
} catch (Exception e) {
- if (log.isErrorEnabled())
- log.error(String.format("%s could not run failure of exchange %s", failureCommand.getClass().getName(), exchange), e);
+ LOG.error(String.format("%s could not run failure of exchange %s", failureCommand.getClass().getName(), exchange), e);
}
}
@@ -241,14 +250,12 @@ public class BeanstalkConsumer extends ScheduledPollConsumer {
try {
command.act(client, exchange);
} catch (BeanstalkException e) {
- if (log.isWarnEnabled())
- log.warn(String.format("Post-processing %s of exchange %s failed, retrying.", command.getClass().getName(), exchange), e);
+ LOG.warn(String.format("Post-processing %s of exchange %s failed, retrying.", command.getClass().getName(), exchange), e);
resetClient();
command.act(client, exchange);
}
} catch (final Exception e) {
- if (log.isErrorEnabled())
- log.error(String.format("%s could not post-process exchange %s", command.getClass().getName(), exchange), e);
+ LOG.error(String.format("%s could not post-process exchange %s", command.getClass().getName(), exchange), e);
exchange.setException(e);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java
index 62c6809..7437204 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java
@@ -18,28 +18,28 @@ package org.apache.camel.component.beanstalk;
import com.surftools.BeanstalkClient.Client;
import org.apache.camel.Component;
-import org.apache.camel.Producer;
-import org.apache.camel.component.beanstalk.processors.*;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.beanstalk.processors.BuryCommand;
+import org.apache.camel.component.beanstalk.processors.Command;
+import org.apache.camel.component.beanstalk.processors.DeleteCommand;
+import org.apache.camel.component.beanstalk.processors.KickCommand;
+import org.apache.camel.component.beanstalk.processors.PutCommand;
+import org.apache.camel.component.beanstalk.processors.ReleaseCommand;
+import org.apache.camel.component.beanstalk.processors.TouchCommand;
import org.apache.camel.impl.ScheduledPollEndpoint;
-/**
- * @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
- * @see BeanstalkConsumer
- * @see org.apache.camel.component.beanstalk.processors.PutCommand
- */
public class BeanstalkEndpoint extends ScheduledPollEndpoint {
final ConnectionSettings conn;
- String command = BeanstalkComponent.COMMAND_PUT;
- long priority = BeanstalkComponent.DEFAULT_PRIORITY;
- int delay = BeanstalkComponent.DEFAULT_DELAY;
- int timeToRun = BeanstalkComponent.DEFAULT_TIME_TO_RUN;
+ private String command = BeanstalkComponent.COMMAND_PUT;
+ private long jobPriority = BeanstalkComponent.DEFAULT_PRIORITY;
+ private int jobDelay = BeanstalkComponent.DEFAULT_DELAY;
+ private int jobTimeToRun = BeanstalkComponent.DEFAULT_TIME_TO_RUN;
- BeanstalkEndpoint(final String uri, final Component component, final ConnectionSettings conn) {
+ public BeanstalkEndpoint(final String uri, final Component component, final ConnectionSettings conn) {
super(uri, component);
-
this.conn = conn;
}
@@ -47,66 +47,70 @@ public class BeanstalkEndpoint extends ScheduledPollEndpoint {
return conn;
}
- /**
- * The command {@link Producer} must execute
- *
- * @param command
- */
- public void setCommand(final String command) {
- this.command = command;
+ public ConnectionSettings getConn() {
+ return conn;
}
- public void setJobPriority(final long priority) {
- this.priority = priority;
+ public String getCommand() {
+ return command;
+ }
+
+ public void setCommand(String command) {
+ this.command = command;
}
public long getJobPriority() {
- return priority;
+ return jobPriority;
}
- public void setJobDelay(final int delay) {
- this.delay = delay;
+ public void setJobPriority(long jobPriority) {
+ this.jobPriority = jobPriority;
}
public int getJobDelay() {
- return delay;
+ return jobDelay;
}
- public void setJobTimeToRun(final int timeToRun) {
- this.timeToRun = timeToRun;
+ public void setJobDelay(int jobDelay) {
+ this.jobDelay = jobDelay;
}
public int getJobTimeToRun() {
- return timeToRun;
+ return jobTimeToRun;
+ }
+
+ public void setJobTimeToRun(int jobTimeToRun) {
+ this.jobTimeToRun = jobTimeToRun;
}
/**
* Creates Camel producer.
- * <p>
+ * <p/>
* Depending on the command parameter (see {@link BeanstalkComponent} URI) it
* will create one of the producer implementations.
*
* @return {@link Producer} instance
* @throws IllegalArgumentException when {@link ConnectionSettings} cannot
- * create a writable {@link Client}
+ * create a writable {@link Client}
*/
@Override
public Producer createProducer() throws Exception {
- Command cmd = null;
- if (BeanstalkComponent.COMMAND_PUT.equals(command))
+ Command cmd;
+ if (BeanstalkComponent.COMMAND_PUT.equals(command)) {
cmd = new PutCommand(this);
- else if (BeanstalkComponent.COMMAND_RELEASE.equals(command))
+ } else if (BeanstalkComponent.COMMAND_RELEASE.equals(command)) {
cmd = new ReleaseCommand(this);
- else if (BeanstalkComponent.COMMAND_BURY.equals(command))
+ } else if (BeanstalkComponent.COMMAND_BURY.equals(command)) {
cmd = new BuryCommand(this);
- else if (BeanstalkComponent.COMMAND_TOUCH.equals(command))
+ } else if (BeanstalkComponent.COMMAND_TOUCH.equals(command)) {
cmd = new TouchCommand(this);
- else if (BeanstalkComponent.COMMAND_DELETE.equals(command))
+ } else if (BeanstalkComponent.COMMAND_DELETE.equals(command)) {
cmd = new DeleteCommand(this);
- else if (BeanstalkComponent.COMMAND_KICK.equals(command))
+ } else if (BeanstalkComponent.COMMAND_KICK.equals(command)) {
cmd = new KickCommand(this);
- else
+ } else {
throw new IllegalArgumentException(String.format("Unknown command for Beanstalk endpoint: %s", command));
+ }
return new BeanstalkProducer(this, cmd);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkExchangeHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkExchangeHelper.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkExchangeHelper.java
index 19a298f..9e39369 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkExchangeHelper.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkExchangeHelper.java
@@ -21,27 +21,28 @@ import org.apache.camel.Message;
import org.apache.camel.NoSuchHeaderException;
import org.apache.camel.util.ExchangeHelper;
-/**
- *
- * @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
- */
public final class BeanstalkExchangeHelper {
+
+ private BeanstalkExchangeHelper() {
+ }
+
public static long getPriority(final BeanstalkEndpoint endpoint, final Message in) {
- return in.getHeader(Headers.PRIORITY, Long.valueOf(endpoint.getJobPriority()), Long.class).longValue();
+ return in.getHeader(Headers.PRIORITY, endpoint.getJobPriority(), Long.class);
}
public static int getDelay(final BeanstalkEndpoint endpoint, final Message in) {
- return in.getHeader(Headers.DELAY, Integer.valueOf(endpoint.getJobDelay()), Integer.class).intValue();
+ return in.getHeader(Headers.DELAY, endpoint.getJobDelay(), Integer.class);
}
public static int getTimeToRun(final BeanstalkEndpoint endpoint, final Message in) {
- return in.getHeader(Headers.TIME_TO_RUN, Integer.valueOf(endpoint.getJobTimeToRun()), Integer.class).intValue();
+ return in.getHeader(Headers.TIME_TO_RUN, endpoint.getJobTimeToRun(), Integer.class);
}
public static long getJobID(final Exchange exchange) throws NoSuchHeaderException {
Long jobId = exchange.getProperty(Headers.JOB_ID, Long.class);
- if (jobId != null)
+ if (jobId != null) {
return jobId;
+ }
return ExchangeHelper.getMandatoryHeader(exchange, Headers.JOB_ID, Long.class);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java
index 83cfa98..6ff0082 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java
@@ -17,30 +17,30 @@
package org.apache.camel.component.beanstalk;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import org.apache.camel.component.beanstalk.processors.Command;
+import java.util.concurrent.Future;
+
import com.surftools.BeanstalkClient.BeanstalkException;
import com.surftools.BeanstalkClient.Client;
-import org.apache.camel.Exchange;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.beanstalk.processors.Command;
import org.apache.camel.impl.DefaultProducer;
-/**
- *
- * @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
- */
public class BeanstalkProducer extends DefaultProducer implements AsyncProcessor {
- private ExecutorService executor = null;
-
- Client client = null;
- final Command command;
+ private ExecutorService executor;
+ private Client client;
+ private final Command command;
public BeanstalkProducer(BeanstalkEndpoint endpoint, final Command command) throws Exception {
super(endpoint);
this.command = command;
}
+ public Command getCommand() {
+ return command;
+ }
+
@Override
public void process(final Exchange exchange) throws Exception {
Future f = executor.submit(new RunCommand(exchange));
@@ -65,8 +65,9 @@ public class BeanstalkProducer extends DefaultProducer implements AsyncProcessor
}
protected void closeClient() {
- if (client != null)
+ if (client != null) {
client.close();
+ }
}
protected void initClient() {
@@ -78,15 +79,15 @@ public class BeanstalkProducer extends DefaultProducer implements AsyncProcessor
super.doStart();
executor = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "Beanstalk-Producer");
executor.execute(new Runnable() {
- public void run() {
- initClient();
- }
- });
+ public void run() {
+ initClient();
+ }
+ });
}
@Override
protected void doStop() throws Exception {
- executor.shutdown();
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(executor);
closeClient();
super.doStop();
}
@@ -122,8 +123,9 @@ public class BeanstalkProducer extends DefaultProducer implements AsyncProcessor
} catch (Throwable t) {
exchange.setException(t);
} finally {
- if (callback != null)
+ if (callback != null) {
callback.done(false);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettings.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettings.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettings.java
index 35359b1..d79db06 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettings.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettings.java
@@ -16,20 +16,19 @@
*/
package org.apache.camel.component.beanstalk;
-import java.util.Arrays;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Scanner;
-import java.net.URLDecoder;
+
import com.surftools.BeanstalkClient.Client;
import com.surftools.BeanstalkClientImpl.ClientImpl;
-import java.io.UnsupportedEncodingException;
/**
* Represents the connection to Beanstalk.
- * <p>
+ * <p/>
* Along with the list of tubes it may watch.
- *
- * @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
*/
public class ConnectionSettings {
final String host;
@@ -54,24 +53,24 @@ public class ConnectionSettings {
while (scanner.hasNext()) {
final String tubeRaw = scanner.next();
try {
- buffer.add( URLDecoder.decode(tubeRaw, "UTF-8") );
+ buffer.add(URLDecoder.decode(tubeRaw, "UTF-8"));
} catch (UnsupportedEncodingException e) {
buffer.add(tubeRaw);
}
}
- this.tubes = buffer.toArray(new String[0]);
+ this.tubes = buffer.toArray(new String[buffer.size()]);
scanner.close();
}
/**
* Returns the {@link Client} instance ready for writing
* operations, e.g. "put".
- * <p>
+ * <p/>
* <code>use(tube)</code> is applied during this call.
*
* @return {@link Client} instance
* @throws IllegalArgumentException the exception is raised when this ConnectionSettings
- * has more than one tube.
+ * has more than one tube.
*/
public Client newWritingClient() throws IllegalArgumentException {
if (tubes.length > 1) {
@@ -93,7 +92,7 @@ public class ConnectionSettings {
/**
* Returns the {@link Client} instance for reading operations with all
* the tubes aleady watched
- * <p>
+ * <p/>
* <code>watch(tube)</code> is applied for every tube during this call.
*
* @param useBlockIO configuration param to {@link Client}
@@ -106,8 +105,9 @@ public class ConnectionSettings {
when using uniqueConnectionPerThread=false. The symptom is that ProtocolHandler
breaks the protocol, reading incomplete messages. To be investigated. */
//client.setUniqueConnectionPerThread(false);
- for (String tube : tubes)
+ for (String tube : tubes) {
client.watch(tube);
+ }
return client;
}
@@ -122,11 +122,11 @@ public class ConnectionSettings {
@Override
public int hashCode() {
- return 41*(41*(41+host.hashCode())+port)+Arrays.hashCode(tubes);
+ return 41 * (41 * (41 + host.hashCode()) + port) + Arrays.hashCode(tubes);
}
@Override
public String toString() {
- return "beanstalk://"+host+":"+port+"/"+Arrays.toString(tubes);
+ return "beanstalk://" + host + ":" + port + "/" + Arrays.toString(tubes);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettingsFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettingsFactory.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettingsFactory.java
index 7949cf2..915cc4a 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettingsFactory.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettingsFactory.java
@@ -16,23 +16,24 @@
*/
package org.apache.camel.component.beanstalk;
-import com.surftools.BeanstalkClient.Client;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-/**
- *
- * @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
- */
+import com.surftools.BeanstalkClient.Client;
+
public class ConnectionSettingsFactory {
+
public static final ConnectionSettingsFactory DEFAULT = new ConnectionSettingsFactory();
+ private static final Pattern HOST_PORT_TUBE_RE = Pattern.compile("^(([\\w.-]+)(:([\\d]+))?/)?([\\w%+]*)$");
- final Pattern HostPortTubeRE = Pattern.compile("^(([\\w.-]+)(:([\\d]+))?/)?([\\w%+]*)$");
+ public ConnectionSettingsFactory() {
+ }
public ConnectionSettings parseUri(final String remaining) throws IllegalArgumentException {
- final Matcher m = HostPortTubeRE.matcher(remaining);
- if (!m.matches())
+ final Matcher m = HOST_PORT_TUBE_RE.matcher(remaining);
+ if (!m.matches()) {
throw new IllegalArgumentException(String.format("Invalid path format: %s - should be [<hostName>[:<port>]/][<tubes>]", remaining));
+ }
final String host = m.group(2) != null ? m.group(2) : Client.DEFAULT_HOST;
final int port = m.group(4) != null ? Integer.parseInt(m.group(4)) : Client.DEFAULT_PORT;
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/Headers.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/Headers.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/Headers.java
index b944872..9e8e3ab 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/Headers.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/Headers.java
@@ -16,31 +16,32 @@
*/
package org.apache.camel.component.beanstalk;
-/**
- *
- * @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
- */
public final class Headers {
- public static final String PREFIX = "beanstalk.";
+
+ public static final String PREFIX = "beanstalk.";
// in
- public static final String PRIORITY = PREFIX+"priority";
- public static final String DELAY = PREFIX+"delay";
- public static final String TIME_TO_RUN = PREFIX+"timeToRun";
+ public static final String PRIORITY = PREFIX + "priority";
+ public static final String DELAY = PREFIX + "delay";
+ public static final String TIME_TO_RUN = PREFIX + "timeToRun";
// in/out
- public static final String JOB_ID = PREFIX+"jobId";
+ public static final String JOB_ID = PREFIX + "jobId";
// out
- public static final String RESULT = PREFIX+"result";
+ public static final String RESULT = PREFIX + "result";
// other info
- public static final String TUBE = PREFIX+"tube";
- public static final String STATE = PREFIX+"state";
- public static final String AGE = PREFIX+"age";
- public static final String TIME_LEFT = PREFIX+"time-left";
- public static final String TIMEOUTS = PREFIX+"timeouts";
- public static final String RELEASES = PREFIX+"releases";
- public static final String BURIES = PREFIX+"buries";
- public static final String KICKS = PREFIX+"kicks";
+ public static final String TUBE = PREFIX + "tube";
+ public static final String STATE = PREFIX + "state";
+ public static final String AGE = PREFIX + "age";
+ public static final String TIME_LEFT = PREFIX + "time-left";
+ public static final String TIMEOUTS = PREFIX + "timeouts";
+ public static final String RELEASES = PREFIX + "releases";
+ public static final String BURIES = PREFIX + "buries";
+ public static final String KICKS = PREFIX + "kicks";
+
+ private Headers() {
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/BuryCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/BuryCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/BuryCommand.java
index 2df0e60..3a3c06d 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/BuryCommand.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/BuryCommand.java
@@ -16,17 +16,17 @@
*/
package org.apache.camel.component.beanstalk.processors;
-import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
-import org.apache.camel.component.beanstalk.BeanstalkExchangeHelper;
-import org.apache.camel.component.beanstalk.Headers;
import com.surftools.BeanstalkClient.Client;
import org.apache.camel.Exchange;
import org.apache.camel.NoSuchHeaderException;
+import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
+import org.apache.camel.component.beanstalk.BeanstalkExchangeHelper;
+import org.apache.camel.component.beanstalk.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BuryCommand extends DefaultCommand {
- private final transient Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger LOG = LoggerFactory.getLogger(BuryCommand.class);
public BuryCommand(BeanstalkEndpoint endpoint) {
super(endpoint);
@@ -36,14 +36,14 @@ public class BuryCommand extends DefaultCommand {
public void act(final Client client, final Exchange exchange) throws NoSuchHeaderException {
final Long jobId = BeanstalkExchangeHelper.getJobID(exchange);
final long priority = BeanstalkExchangeHelper.getPriority(endpoint, exchange.getIn());
- final boolean result = client.bury(jobId.longValue(), priority);
+ final boolean result = client.bury(jobId, priority);
- if (!result && log.isWarnEnabled())
- log.warn(String.format("Failed to bury job %d (with priority %d)", jobId, priority));
- else if (log.isDebugEnabled())
- log.debug(String.format("Job %d buried with priority %d. Result is %b", jobId, priority, result));
+ if (!result && LOG.isWarnEnabled()) {
+ LOG.warn(String.format("Failed to bury job %d (with priority %d)", jobId, priority));
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Job %d buried with priority %d. Result is %b", jobId, priority, result));
+ }
answerWith(exchange, Headers.RESULT, result);
-
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/Command.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/Command.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/Command.java
index 7088279..e578286 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/Command.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/Command.java
@@ -20,5 +20,6 @@ import com.surftools.BeanstalkClient.Client;
import org.apache.camel.Exchange;
public interface Command {
- public void act(Client client, Exchange exchange) throws Exception;
+
+ void act(Client client, Exchange exchange) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DefaultCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DefaultCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DefaultCommand.java
index e8e9a45..d81bb10 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DefaultCommand.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DefaultCommand.java
@@ -16,9 +16,9 @@
*/
package org.apache.camel.component.beanstalk.processors;
-import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
+import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
import org.apache.camel.util.ExchangeHelper;
abstract class DefaultCommand implements Command {
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DeleteCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DeleteCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DeleteCommand.java
index f738556..6f5967b 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DeleteCommand.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DeleteCommand.java
@@ -16,17 +16,17 @@
*/
package org.apache.camel.component.beanstalk.processors;
-import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
-import org.apache.camel.component.beanstalk.BeanstalkExchangeHelper;
-import org.apache.camel.component.beanstalk.Headers;
import com.surftools.BeanstalkClient.Client;
import org.apache.camel.Exchange;
import org.apache.camel.NoSuchHeaderException;
+import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
+import org.apache.camel.component.beanstalk.BeanstalkExchangeHelper;
+import org.apache.camel.component.beanstalk.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DeleteCommand extends DefaultCommand {
- private final transient Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger LOG = LoggerFactory.getLogger(DeleteCommand.class);
public DeleteCommand(BeanstalkEndpoint endpoint) {
super(endpoint);
@@ -35,11 +35,12 @@ public class DeleteCommand extends DefaultCommand {
@Override
public void act(final Client client, final Exchange exchange) throws NoSuchHeaderException {
final Long jobId = BeanstalkExchangeHelper.getJobID(exchange);
- final boolean result = client.delete(jobId.longValue());
- if (!result && log.isWarnEnabled())
- log.warn(String.format("Failed to delete job %d", jobId));
- else if (log.isDebugEnabled())
- log.debug(String.format("Job %d deleted. Result is %b", jobId, result));
+ final boolean result = client.delete(jobId);
+ if (!result && LOG.isWarnEnabled()) {
+ LOG.warn(String.format("Failed to delete job %d", jobId));
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Job %d deleted. Result is %b", jobId, result));
+ }
answerWith(exchange, Headers.RESULT, result);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/KickCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/KickCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/KickCommand.java
index 7bad253..fdc709a 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/KickCommand.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/KickCommand.java
@@ -16,17 +16,17 @@
*/
package org.apache.camel.component.beanstalk.processors;
-import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
import com.surftools.BeanstalkClient.Client;
import org.apache.camel.Exchange;
import org.apache.camel.InvalidPayloadException;
import org.apache.camel.Message;
import org.apache.camel.NoSuchHeaderException;
+import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KickCommand extends DefaultCommand {
- private final transient Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger LOG = LoggerFactory.getLogger(KickCommand.class);
public KickCommand(BeanstalkEndpoint endpoint) {
super(endpoint);
@@ -36,8 +36,9 @@ public class KickCommand extends DefaultCommand {
public void act(final Client client, final Exchange exchange) throws NoSuchHeaderException, InvalidPayloadException {
final Integer jobs = exchange.getIn().getMandatoryBody(Integer.class);
final int result = client.kick(jobs);
- if (log.isDebugEnabled())
- log.debug(String.format("Kick %d jobs. Kicked %d actually.", jobs, result));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Kick %d jobs. Kicked %d actually.", jobs, result));
+ }
final Message answer = getAnswerMessage(exchange);
answer.setBody(result, Integer.class);
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/PutCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/PutCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/PutCommand.java
index c6fa32e..4f5baae 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/PutCommand.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/PutCommand.java
@@ -16,18 +16,18 @@
*/
package org.apache.camel.component.beanstalk.processors;
-import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
-import org.apache.camel.component.beanstalk.BeanstalkExchangeHelper;
-import org.apache.camel.component.beanstalk.Headers;
import com.surftools.BeanstalkClient.Client;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.NoSuchHeaderException;
+import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
+import org.apache.camel.component.beanstalk.BeanstalkExchangeHelper;
+import org.apache.camel.component.beanstalk.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PutCommand extends DefaultCommand {
- private final transient Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger LOG = LoggerFactory.getLogger(PutCommand.class);
public PutCommand(BeanstalkEndpoint endpoint) {
super(endpoint);
@@ -42,8 +42,9 @@ public class PutCommand extends DefaultCommand {
final int timeToRun = BeanstalkExchangeHelper.getTimeToRun(endpoint, in);
final long jobId = client.put(priority, delay, timeToRun, in.getBody(byte[].class));
- if (log.isDebugEnabled())
- log.debug(String.format("Created job %d with priority %d, delay %d seconds and time to run %d", jobId, priority, delay, timeToRun));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Created job %d with priority %d, delay %d seconds and time to run %d", jobId, priority, delay, timeToRun));
+ }
answerWith(exchange, Headers.JOB_ID, jobId);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/ReleaseCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/ReleaseCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/ReleaseCommand.java
index f41e080..762a42d 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/ReleaseCommand.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/ReleaseCommand.java
@@ -16,18 +16,18 @@
*/
package org.apache.camel.component.beanstalk.processors;
-import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
-import org.apache.camel.component.beanstalk.BeanstalkExchangeHelper;
-import org.apache.camel.component.beanstalk.Headers;
import com.surftools.BeanstalkClient.Client;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.NoSuchHeaderException;
+import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
+import org.apache.camel.component.beanstalk.BeanstalkExchangeHelper;
+import org.apache.camel.component.beanstalk.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReleaseCommand extends DefaultCommand {
- private final transient Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger LOG = LoggerFactory.getLogger(ReleaseCommand.class);
public ReleaseCommand(BeanstalkEndpoint endpoint) {
super(endpoint);
@@ -41,11 +41,12 @@ public class ReleaseCommand extends DefaultCommand {
final long priority = BeanstalkExchangeHelper.getPriority(endpoint, in);
final int delay = BeanstalkExchangeHelper.getDelay(endpoint, in);
- final boolean result = client.release(jobId.longValue(), priority, delay);
- if (!result && log.isWarnEnabled())
- log.warn(String.format("Failed to release job %d (priority %d, delay %d)", jobId, priority, delay));
- else if (log.isDebugEnabled())
- log.debug(String.format("Job %d released with priority %d, delay %d seconds. Result is %b", jobId, priority, delay, result));
+ final boolean result = client.release(jobId, priority, delay);
+ if (!result && LOG.isWarnEnabled()) {
+ LOG.warn(String.format("Failed to release job %d (priority %d, delay %d)", jobId, priority, delay));
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Job %d released with priority %d, delay %d seconds. Result is %b", jobId, priority, delay, result));
+ }
answerWith(exchange, Headers.RESULT, result);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/TouchCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/TouchCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/TouchCommand.java
index c43f32c..105cc71 100644
--- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/TouchCommand.java
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/TouchCommand.java
@@ -16,17 +16,17 @@
*/
package org.apache.camel.component.beanstalk.processors;
-import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
-import org.apache.camel.component.beanstalk.Headers;
import com.surftools.BeanstalkClient.Client;
import org.apache.camel.Exchange;
import org.apache.camel.NoSuchHeaderException;
+import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
+import org.apache.camel.component.beanstalk.Headers;
import org.apache.camel.util.ExchangeHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TouchCommand extends DefaultCommand {
- private final transient Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger LOG = LoggerFactory.getLogger(TouchCommand.class);
public TouchCommand(BeanstalkEndpoint endpoint) {
super(endpoint);
@@ -35,11 +35,12 @@ public class TouchCommand extends DefaultCommand {
@Override
public void act(final Client client, final Exchange exchange) throws NoSuchHeaderException {
final Long jobId = ExchangeHelper.getMandatoryHeader(exchange, Headers.JOB_ID, Long.class);
- final boolean result = client.touch(jobId.longValue());
- if (!result && log.isWarnEnabled())
- log.warn(String.format("Failed to touch job %d", jobId));
- else if (log.isDebugEnabled())
- log.debug(String.format("Job %d touched. Result is %b", jobId, result));
+ final boolean result = client.touch(jobId);
+ if (!result && LOG.isWarnEnabled()) {
+ LOG.warn(String.format("Failed to touch job %d", jobId));
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Job %d touched. Result is %b", jobId, result));
+ }
answerWith(exchange, Headers.RESULT, result);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/AwaitingConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/AwaitingConsumerTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/AwaitingConsumerTest.java
index 164dff2..ebef336 100644
--- a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/AwaitingConsumerTest.java
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/AwaitingConsumerTest.java
@@ -16,20 +16,27 @@
*/
package org.apache.camel.component.beanstalk;
-import com.surftools.BeanstalkClient.Job;
import com.surftools.BeanstalkClient.BeanstalkException;
+import com.surftools.BeanstalkClient.Job;
import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Test;
-import static org.mockito.Mockito.*;
+
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class AwaitingConsumerTest extends BeanstalkMockTestSupport {
- final String testMessage = "hello, world";
@EndpointInject(uri = "beanstalk:tube")
protected BeanstalkEndpoint endpoint;
+ private String testMessage = "hello, world";
+
@Test
public void testReceive() throws Exception {
final Job jobMock = mock(Job.class);
@@ -39,8 +46,8 @@ public class AwaitingConsumerTest extends BeanstalkMockTestSupport {
when(jobMock.getJobId()).thenReturn(jobId);
when(jobMock.getData()).thenReturn(payload);
when(client.reserve(anyInt()))
- .thenReturn(jobMock)
- .thenReturn(null);
+ .thenReturn(jobMock)
+ .thenReturn(null);
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedMessageCount(1);
@@ -62,8 +69,8 @@ public class AwaitingConsumerTest extends BeanstalkMockTestSupport {
when(jobMock.getJobId()).thenReturn(jobId);
when(jobMock.getData()).thenReturn(payload);
when(client.reserve(anyInt()))
- .thenThrow(new BeanstalkException("test"))
- .thenReturn(jobMock);
+ .thenThrow(new BeanstalkException("test"))
+ .thenReturn(jobMock);
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedMessageCount(1);
@@ -71,7 +78,7 @@ public class AwaitingConsumerTest extends BeanstalkMockTestSupport {
result.expectedPropertyReceived(Headers.JOB_ID, jobId);
result.message(0).header(Headers.JOB_ID).isEqualTo(jobId);
result.assertIsSatisfied(100);
-
+
verify(client, atLeast(1)).reserve(anyInt());
verify(client, times(1)).close();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/BeanstalkMockTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/BeanstalkMockTestSupport.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/BeanstalkMockTestSupport.java
index 42b3114..172849c 100644
--- a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/BeanstalkMockTestSupport.java
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/BeanstalkMockTestSupport.java
@@ -22,10 +22,12 @@ import org.junit.After;
import org.junit.Before;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import static org.mockito.Mockito.*;
+
+import static org.mockito.Mockito.reset;
public class BeanstalkMockTestSupport extends CamelTestSupport {
- @Mock Client client;
+ @Mock
+ Client client;
@Before
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConnectionSettingsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConnectionSettingsTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConnectionSettingsTest.java
index 6b1d81d..ab221b5 100644
--- a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConnectionSettingsTest.java
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConnectionSettingsTest.java
@@ -16,16 +16,19 @@
*/
package org.apache.camel.component.beanstalk;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
import com.surftools.BeanstalkClient.Client;
import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
public class ConnectionSettingsTest {
+
@Test
public void parseUriTest() {
- final ConnectionSettingsFactory factory = BeanstalkComponent.connFactory;
+ final ConnectionSettingsFactory factory = BeanstalkComponent.getConnectionSettingsFactory();
assertEquals("Full URI", new ConnectionSettings("host.domain.tld", 11300, "someTube"), factory.parseUri("host.domain.tld:11300/someTube"));
assertEquals("No port", new ConnectionSettings("host.domain.tld", Client.DEFAULT_PORT, "someTube"), factory.parseUri("host.domain.tld/someTube"));
assertEquals("Only tube", new ConnectionSettings(Client.DEFAULT_HOST, Client.DEFAULT_PORT, "someTube"), factory.parseUri("someTube"));
@@ -33,21 +36,21 @@ public class ConnectionSettingsTest {
@Test
public void parseTubesTest() {
- final ConnectionSettingsFactory factory = BeanstalkComponent.connFactory;
- assertArrayEquals("Full URI", new String[] {"tube1", "tube2"}, factory.parseUri("host:90/tube1+tube2").tubes);
- assertArrayEquals("No port", new String[] {"tube1", "tube2"}, factory.parseUri("host/tube1+tube2").tubes);
- assertArrayEquals("Only tubes", new String[] {"tube1", "tube2"}, factory.parseUri("tube1+tube2").tubes);
+ final ConnectionSettingsFactory factory = BeanstalkComponent.getConnectionSettingsFactory();
+ assertArrayEquals("Full URI", new String[]{"tube1", "tube2"}, factory.parseUri("host:90/tube1+tube2").tubes);
+ assertArrayEquals("No port", new String[]{"tube1", "tube2"}, factory.parseUri("host/tube1+tube2").tubes);
+ assertArrayEquals("Only tubes", new String[]{"tube1", "tube2"}, factory.parseUri("tube1+tube2").tubes);
assertArrayEquals("Empty URI", new String[0], factory.parseUri("").tubes);
}
- @Test(expected=IllegalArgumentException.class)
+ @Test(expected = IllegalArgumentException.class)
public void notValidHost() {
- final ConnectionSettingsFactory factory = BeanstalkComponent.connFactory;
+ final ConnectionSettingsFactory factory = BeanstalkComponent.getConnectionSettingsFactory();
fail(String.format("Calling on not valid URI must raise exception, but got result %s", factory.parseUri("not_valid?host/tube?")));
}
@Before
public void setUp() {
- BeanstalkComponent.connFactory = new ConnectionSettingsFactory();
+ BeanstalkComponent.setConnectionSettingsFactory(new ConnectionSettingsFactory());
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConsumerCompletionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConsumerCompletionTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConsumerCompletionTest.java
index 14a0955..4d0a863 100644
--- a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConsumerCompletionTest.java
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ConsumerCompletionTest.java
@@ -23,16 +23,24 @@ import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Test;
-import static org.mockito.Mockito.*;
-public class ConsumerCompletionTest extends BeanstalkMockTestSupport {
- final String testMessage = "hello, world";
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
- boolean shouldIdie = false;
- final Processor processor = new Processor() {
+public class ConsumerCompletionTest extends BeanstalkMockTestSupport {
+ private String testMessage = "hello, world";
+ private boolean shouldIdie;
+ private Processor processor = new Processor() {
@Override
public void process(Exchange exchange) throws InterruptedException {
- if (shouldIdie) throw new InterruptedException("die");
+ if (shouldIdie) {
+ throw new InterruptedException("die");
+ }
}
};
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/EndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/EndpointTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/EndpointTest.java
index 903f272..d778329 100644
--- a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/EndpointTest.java
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/EndpointTest.java
@@ -21,13 +21,15 @@ import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.After;
-import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.Test;
-import org.junit.Ignore;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
public class EndpointTest {
- CamelContext context = null;
+ CamelContext context;
@Before
public void setUp() throws Exception {
@@ -61,19 +63,19 @@ public class EndpointTest {
public void testCommand() {
BeanstalkEndpoint endpoint = context.getEndpoint("beanstalk:default?command=release", BeanstalkEndpoint.class);
assertNotNull("Beanstalk endpoint", endpoint);
- assertEquals("Command", BeanstalkComponent.COMMAND_RELEASE, endpoint.command);
+ assertEquals("Command", BeanstalkComponent.COMMAND_RELEASE, endpoint.getCommand());
}
@Test
public void testTubes() {
BeanstalkEndpoint endpoint = context.getEndpoint("beanstalk:host:11303/tube1+tube%2B+tube%3F?command=kick", BeanstalkEndpoint.class);
assertNotNull("Beanstalk endpoint", endpoint);
- assertEquals("Command", BeanstalkComponent.COMMAND_KICK, endpoint.command);
+ assertEquals("Command", BeanstalkComponent.COMMAND_KICK, endpoint.getCommand());
assertEquals("Host", "host", endpoint.conn.host);
- assertArrayEquals("Tubes", new String[] {"tube1", "tube+", "tube?"}, endpoint.conn.tubes);
+ assertArrayEquals("Tubes", new String[]{"tube1", "tube+", "tube?"}, endpoint.conn.tubes);
}
- @Test(expected=FailedToCreateProducerException.class)
+ @Test(expected = FailedToCreateProducerException.class)
public void testWrongCommand() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/Helper.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/Helper.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/Helper.java
index 3ef5cb9..be1ecd6 100644
--- a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/Helper.java
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/Helper.java
@@ -16,13 +16,18 @@
*/
package org.apache.camel.component.beanstalk;
-import com.surftools.BeanstalkClient.Client;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+
+import com.surftools.BeanstalkClient.Client;
import org.apache.camel.CamelContext;
public final class Helper {
+
+ private Helper() {
+ }
+
public static ConnectionSettings mockConn(final Client client) {
return new MockConnectionSettings(client);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ImmediateConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ImmediateConsumerTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ImmediateConsumerTest.java
index 7a5a296..eb24dd2 100644
--- a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ImmediateConsumerTest.java
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ImmediateConsumerTest.java
@@ -17,23 +17,29 @@
package org.apache.camel.component.beanstalk;
import com.surftools.BeanstalkClient.Job;
-import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Test;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class ImmediateConsumerTest extends BeanstalkMockTestSupport {
- final String testMessage = "hello, world";
+ String testMessage = "hello, world";
+ boolean shouldIdie;
- boolean shouldIdie = false;
- final Processor processor = new Processor() {
+ Processor processor = new Processor() {
@Override
public void process(Exchange exchange) throws InterruptedException {
- if (shouldIdie) throw new InterruptedException("die");
+ if (shouldIdie) {
+ throw new InterruptedException("die");
+ }
}
};
@@ -46,8 +52,8 @@ public class ImmediateConsumerTest extends BeanstalkMockTestSupport {
when(jobMock.getJobId()).thenReturn(jobId);
when(jobMock.getData()).thenReturn(payload);
when(client.reserve(anyInt()))
- .thenReturn(jobMock)
- .thenReturn(null);
+ .thenReturn(jobMock)
+ .thenReturn(null);
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedMessageCount(1);
http://git-wip-us.apache.org/repos/asf/camel/blob/d6361062/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ProducerTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ProducerTest.java
index e3949a2..42e809d 100644
--- a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ProducerTest.java
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/ProducerTest.java
@@ -17,8 +17,6 @@
package org.apache.camel.component.beanstalk;
import com.surftools.BeanstalkClient.BeanstalkException;
-import org.apache.camel.component.beanstalk.processors.*;
-import org.apache.camel.CamelExecutionException;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
@@ -27,14 +25,23 @@ import org.apache.camel.Produce;
import org.apache.camel.Producer;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.beanstalk.processors.BuryCommand;
+import org.apache.camel.component.beanstalk.processors.DeleteCommand;
+import org.apache.camel.component.beanstalk.processors.PutCommand;
+import org.apache.camel.component.beanstalk.processors.ReleaseCommand;
+import org.apache.camel.component.beanstalk.processors.TouchCommand;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Test;
-import static org.junit.Assert.*;
-import static org.hamcrest.CoreMatchers.*;
-import static org.mockito.Mockito.*;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class ProducerTest extends BeanstalkMockTestSupport {
- final String testMessage = "hello, world";
@EndpointInject(uri = "beanstalk:tube")
protected BeanstalkEndpoint endpoint;
@@ -45,6 +52,8 @@ public class ProducerTest extends BeanstalkMockTestSupport {
@Produce(uri = "direct:start")
protected ProducerTemplate direct;
+ private String testMessage = "hello, world";
+
@Test
public void testPut() throws Exception {
final long priority = BeanstalkComponent.DEFAULT_PRIORITY;
@@ -58,7 +67,7 @@ public class ProducerTest extends BeanstalkMockTestSupport {
final Producer producer = endpoint.createProducer();
assertNotNull("Producer", producer);
assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
- assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(PutCommand.class));
+ assertThat("Processor class", ((BeanstalkProducer) producer).getCommand(), instanceOf(PutCommand.class));
final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() { // TODO: SetBodyProcessor(?)
public void process(Exchange exchange) {
@@ -83,7 +92,7 @@ public class ProducerTest extends BeanstalkMockTestSupport {
Producer producer = endpoint.createProducer();
assertNotNull("Producer", producer);
assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
- assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(PutCommand.class));
+ assertThat("Processor class", ((BeanstalkProducer) producer).getCommand(), instanceOf(PutCommand.class));
final Exchange exchange = template.send(endpoint, ExchangePattern.InOut, new Processor() { // TODO: SetBodyProcessor(?)
public void process(Exchange exchange) {
@@ -108,7 +117,7 @@ public class ProducerTest extends BeanstalkMockTestSupport {
Producer producer = endpoint.createProducer();
assertNotNull("Producer", producer);
assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
- assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(PutCommand.class));
+ assertThat("Processor class", ((BeanstalkProducer) producer).getCommand(), instanceOf(PutCommand.class));
final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() { // TODO: SetBodyProcessor(?)
public void process(Exchange exchange) {
@@ -132,7 +141,7 @@ public class ProducerTest extends BeanstalkMockTestSupport {
Producer producer = endpoint.createProducer();
assertNotNull("Producer", producer);
assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
- assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(BuryCommand.class));
+ assertThat("Processor class", ((BeanstalkProducer) producer).getCommand(), instanceOf(BuryCommand.class));
when(client.bury(jobId, priority)).thenReturn(true);
@@ -153,10 +162,11 @@ public class ProducerTest extends BeanstalkMockTestSupport {
Producer producer = endpoint.createProducer();
assertNotNull("Producer", producer);
assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
- assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(BuryCommand.class));
+ assertThat("Processor class", ((BeanstalkProducer) producer).getCommand(), instanceOf(BuryCommand.class));
final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() {
- public void process(Exchange exchange) {}
+ public void process(Exchange exchange) {
+ }
});
assertTrue("Exchange failed", exchange.isFailed());
@@ -173,7 +183,7 @@ public class ProducerTest extends BeanstalkMockTestSupport {
Producer producer = endpoint.createProducer();
assertNotNull("Producer", producer);
assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
- assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(BuryCommand.class));
+ assertThat("Processor class", ((BeanstalkProducer) producer).getCommand(), instanceOf(BuryCommand.class));
when(client.bury(jobId, priority)).thenReturn(true);
@@ -197,7 +207,7 @@ public class ProducerTest extends BeanstalkMockTestSupport {
Producer producer = endpoint.createProducer();
assertNotNull("Producer", producer);
assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
- assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(DeleteCommand.class));
+ assertThat("Processor class", ((BeanstalkProducer) producer).getCommand(), instanceOf(DeleteCommand.class));
when(client.delete(jobId)).thenReturn(true);
@@ -218,10 +228,11 @@ public class ProducerTest extends BeanstalkMockTestSupport {
Producer producer = endpoint.createProducer();
assertNotNull("Producer", producer);
assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
- assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(DeleteCommand.class));
+ assertThat("Processor class", ((BeanstalkProducer) producer).getCommand(), instanceOf(DeleteCommand.class));
final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() {
- public void process(Exchange exchange) {}
+ public void process(Exchange exchange) {
+ }
});
assertTrue("Exchange failed", exchange.isFailed());
@@ -239,7 +250,7 @@ public class ProducerTest extends BeanstalkMockTestSupport {
Producer producer = endpoint.createProducer();
assertNotNull("Producer", producer);
assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
- assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(ReleaseCommand.class));
+ assertThat("Processor class", ((BeanstalkProducer) producer).getCommand(), instanceOf(ReleaseCommand.class));
when(client.release(jobId, priority, delay)).thenReturn(true);
@@ -260,10 +271,11 @@ public class ProducerTest extends BeanstalkMockTestSupport {
Producer producer = endpoint.createProducer();
assertNotNull("Producer", producer);
assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
- assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(ReleaseCommand.class));
+ assertThat("Processor class", ((BeanstalkProducer) producer).getCommand(), instanceOf(ReleaseCommand.class));
final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() {
- public void process(Exchange exchange) {}
+ public void process(Exchange exchange) {
+ }
});
assertTrue("Exchange failed", exchange.isFailed());
@@ -281,7 +293,7 @@ public class ProducerTest extends BeanstalkMockTestSupport {
Producer producer = endpoint.createProducer();
assertNotNull("Producer", producer);
assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
- assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(ReleaseCommand.class));
+ assertThat("Processor class", ((BeanstalkProducer) producer).getCommand(), instanceOf(ReleaseCommand.class));
when(client.release(jobId, priority, delay)).thenReturn(true);
@@ -306,7 +318,7 @@ public class ProducerTest extends BeanstalkMockTestSupport {
Producer producer = endpoint.createProducer();
assertNotNull("Producer", producer);
assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
- assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(TouchCommand.class));
+ assertThat("Processor class", ((BeanstalkProducer) producer).getCommand(), instanceOf(TouchCommand.class));
when(client.touch(jobId)).thenReturn(true);
@@ -327,10 +339,11 @@ public class ProducerTest extends BeanstalkMockTestSupport {
Producer producer = endpoint.createProducer();
assertNotNull("Producer", producer);
assertThat("Producer class", producer, instanceOf(BeanstalkProducer.class));
- assertThat("Processor class", ((BeanstalkProducer)producer).command, instanceOf(TouchCommand.class));
+ assertThat("Processor class", ((BeanstalkProducer) producer).getCommand(), instanceOf(TouchCommand.class));
final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() {
- public void process(Exchange exchange) {}
+ public void process(Exchange exchange) {
+ }
});
assertTrue("Exchange failed", exchange.isFailed());
@@ -350,7 +363,7 @@ public class ProducerTest extends BeanstalkMockTestSupport {
resultEndpoint.expectedMessageCount(1);
resultEndpoint.allMessages().body().isEqualTo(testMessage);
- resultEndpoint.allMessages().header(Headers.JOB_ID).isEqualTo(Long.valueOf(jobId));
+ resultEndpoint.allMessages().header(Headers.JOB_ID).isEqualTo(jobId);
direct.sendBodyAndHeader(testMessage, Headers.TIME_TO_RUN, timeToRun);
resultEndpoint.assertIsSatisfied();
@@ -370,12 +383,12 @@ public class ProducerTest extends BeanstalkMockTestSupport {
final long jobId = 113;
when(client.put(priority, delay, timeToRun, payload))
- .thenThrow(new BeanstalkException("test"))
- .thenReturn(jobId);
+ .thenThrow(new BeanstalkException("test"))
+ .thenReturn(jobId);
resultEndpoint.expectedMessageCount(1);
resultEndpoint.allMessages().body().isEqualTo(testMessage);
- resultEndpoint.allMessages().header(Headers.JOB_ID).isEqualTo(Long.valueOf(jobId));
+ resultEndpoint.allMessages().header(Headers.JOB_ID).isEqualTo(jobId);
direct.sendBodyAndHeader(testMessage, Headers.TIME_TO_RUN, timeToRun);
resultEndpoint.assertIsSatisfied();
@@ -392,7 +405,7 @@ public class ProducerTest extends BeanstalkMockTestSupport {
final long jobId = 111;
when(client.touch(jobId))
- .thenThrow(new BeanstalkException("test"));
+ .thenThrow(new BeanstalkException("test"));
endpoint.setCommand(BeanstalkComponent.COMMAND_TOUCH);
final Exchange exchange = template.send(endpoint, ExchangePattern.InOnly, new Processor() {
[2/8] git commit: Donating camel-beanstalk component to Apache Camel
Posted by da...@apache.org.
Donating camel-beanstalk component to Apache Camel
"camel-beanstalk" component was developed and maintained by me (Alexander Azarov) for Osinka https://github.com/osinka . Per @davsclaus consideration https://github.com/osinka/camel-beanstalk/issues/8 , we are donating the code to Apache Camel.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a4ff6b62
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a4ff6b62
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a4ff6b62
Branch: refs/heads/master
Commit: a4ff6b626eceea4271bdf60a5d0983bc0b282b04
Parents: 42fbf67
Author: Alexander Azarov <az...@osinka.ru>
Authored: Sat Sep 20 12:21:23 2014 +0300
Committer: Alexander Azarov <az...@osinka.ru>
Committed: Sat Sep 20 12:21:25 2014 +0300
----------------------------------------------------------------------
components/camel-beanstalk/pom.xml | 88 ++++
components/camel-beanstalk/src/etc/header.txt | 13 +
.../component/beanstalk/BeanstalkComponent.java | 88 ++++
.../component/beanstalk/BeanstalkConsumer.java | 257 ++++++++++++
.../component/beanstalk/BeanstalkEndpoint.java | 125 ++++++
.../beanstalk/BeanstalkExchangeHelper.java | 47 +++
.../component/beanstalk/BeanstalkProducer.java | 130 ++++++
.../component/beanstalk/ConnectionSettings.java | 132 ++++++
.../beanstalk/ConnectionSettingsFactory.java | 43 ++
.../camel/component/beanstalk/Headers.java | 46 ++
.../beanstalk/processors/BuryCommand.java | 49 +++
.../component/beanstalk/processors/Command.java | 24 ++
.../beanstalk/processors/DefaultCommand.java | 45 ++
.../beanstalk/processors/DeleteCommand.java | 46 ++
.../beanstalk/processors/KickCommand.java | 45 ++
.../beanstalk/processors/PutCommand.java | 50 +++
.../beanstalk/processors/ReleaseCommand.java | 52 +++
.../beanstalk/processors/TouchCommand.java | 46 ++
.../src/main/resources/META-INF/LICENSE.txt | 203 +++++++++
.../src/main/resources/META-INF/NOTICE.txt | 11 +
.../org/apache/camel/component/beanstalk | 18 +
.../beanstalk/AwaitingConsumerTest.java | 88 ++++
.../beanstalk/BeanstalkMockTestSupport.java | 45 ++
.../beanstalk/ConnectionSettingsTest.java | 53 +++
.../beanstalk/ConsumerCompletionTest.java | 118 ++++++
.../camel/component/beanstalk/EndpointTest.java | 90 ++++
.../camel/component/beanstalk/Helper.java | 81 ++++
.../beanstalk/ImmediateConsumerTest.java | 93 ++++
.../camel/component/beanstalk/ProducerTest.java | 419 +++++++++++++++++++
.../integration/BeanstalkCamelTestSupport.java | 41 ++
.../BuryProducerIntegrationTest.java | 82 ++++
.../integration/ConsumerIntegrationTest.java | 66 +++
.../DeleteProducerIntegrationTest.java | 76 ++++
.../integration/PutProducerIntegrationTest.java | 112 +++++
.../ReleaseProducerIntegrationTest.java | 82 ++++
.../TouchProducerIntegrationTest.java | 82 ++++
components/pom.xml | 1 +
parent/pom.xml | 1 +
38 files changed, 3088 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/pom.xml b/components/camel-beanstalk/pom.xml
new file mode 100644
index 0000000..1716fc0
--- /dev/null
+++ b/components/camel-beanstalk/pom.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>components</artifactId>
+ <version>2.15-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>camel-beanstalk</artifactId>
+ <packaging>bundle</packaging>
+ <name>Camel :: Beanstalk</name>
+
+ <description>Camel Beanstalk component</description>
+
+ <properties>
+ <camel.osgi.export.pkg>org.apache.camel.component.beanstalk.*</camel.osgi.export.pkg>
+ <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=beanstalk</camel.osgi.export.service>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core</artifactId>
+ </dependency>
+
+ <!-- Beanstalkd Java library -->
+ <dependency>
+ <groupId>com.surftools</groupId>
+ <artifactId>BeanstalkClient</artifactId>
+ <version>${beanstalkd-client-version}</version>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>integration</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.14</version>
+ <configuration>
+ <excludes />
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/etc/header.txt
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/etc/header.txt b/components/camel-beanstalk/src/etc/header.txt
new file mode 100644
index 0000000..4091cb9
--- /dev/null
+++ b/components/camel-beanstalk/src/etc/header.txt
@@ -0,0 +1,13 @@
+Copyright (C) ${year} ${author} <${email}>
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkComponent.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkComponent.java
new file mode 100644
index 0000000..0040697
--- /dev/null
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkComponent.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import java.util.Map;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+/**
+ * Beanstalk Camel component.
+ *
+ * URI is <code>beanstalk://[host[:port]][/tube]?query</code>
+ * <p>
+ * Parameters:<ul>
+ * <li><code>command</code> - one of "put", "release", "bury", "touch", "delete", "kick".
+ * "put" is the default for Producers.</li>
+ * <li><code>jobPriority</code></li>
+ * <li><code>jobDelay</code></li>
+ * <li><code>jobTimeToRun</code></li>
+ * <li><code>consumer.onFailure</code></li>
+ * <li><code>consumer.awaitJob</code></li>
+ * </ul>
+ *
+ * @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
+ * @see BeanstalkEndpoint
+ * @see ConnectionSettingsFactory
+ */
+public class BeanstalkComponent extends DefaultComponent {
+ public static final String DEFAULT_TUBE = "default";
+
+ public final static String COMMAND_BURY = "bury";
+ public final static String COMMAND_RELEASE = "release";
+ public final static String COMMAND_PUT = "put";
+ public final static String COMMAND_TOUCH = "touch";
+ public final static String COMMAND_DELETE = "delete";
+ public final static String COMMAND_KICK = "kick";
+
+ public final static long DEFAULT_PRIORITY = 1000; // 0 is highest
+ public final static int DEFAULT_DELAY = 0;
+ public final static int DEFAULT_TIME_TO_RUN = 60; // if 0 the daemon sets 1.
+
+ static ConnectionSettingsFactory connFactory = ConnectionSettingsFactory.DEFAULT;
+
+ public BeanstalkComponent() {
+ }
+
+ public BeanstalkComponent(final CamelContext context) {
+ super(context);
+ }
+
+ @Override
+ public boolean useRawUri() {
+ return true;
+ }
+
+ @Override
+ protected Endpoint createEndpoint(final String uri, final String remaining, final Map<String,Object> parameters) throws Exception {
+ return new BeanstalkEndpoint(uri, this, connFactory.parseUri(remaining));
+ }
+
+ /**
+ * Custom ConnectionSettingsFactory.
+ * <p>
+ * Specify which {@link ConnectionSettingsFactory} to use to make connections to Beanstalkd. Especially
+ * useful for unit testing without beanstalkd daemon (you can mock {@link ConnectionSettings})
+ *
+ * @param connFactory
+ * @see ConnectionSettingsFactory
+ */
+ public static void setConnectionSettingsFactory(ConnectionSettingsFactory connFactory) {
+ BeanstalkComponent.connFactory = connFactory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java
new file mode 100644
index 0000000..9c16f7d
--- /dev/null
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import org.apache.camel.component.beanstalk.processors.*;
+import com.surftools.BeanstalkClient.BeanstalkException;
+import com.surftools.BeanstalkClient.Client;
+import com.surftools.BeanstalkClient.Job;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.impl.ScheduledPollConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PollingConsumer to read Beanstalk jobs.
+ *
+ * The consumer may delete the job immediately or based on successful {@link Exchange}
+ * completion. The behavior is configurable by <code>consumer.awaitJob</code>
+ * flag (by default <code>true</code>)
+ *
+ * This consumer will add a {@link Synchronization} object to every {@link Exchange}
+ * object it creates in order to react on successful exchange completion or failure.
+ *
+ * In the case of successful completion, Beanstalk's <code>delete</code> method is
+ * called upon the job. In the case of failure the default reaction is to call
+ * <code>bury</code>.
+ *
+ * The reaction on failures is configurable: possible variants are "bury", "release" or "delete"
+ *
+ * @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
+ */
+public class BeanstalkConsumer extends ScheduledPollConsumer {
+ private final transient Logger log = LoggerFactory.getLogger(getClass());
+
+ String onFailure = BeanstalkComponent.COMMAND_BURY;
+ boolean useBlockIO = true;
+ boolean deleteImmediately = false;
+
+ private Client client = null;
+ private ExecutorService executor = null;
+ private Synchronization sync = null;
+
+ private static String[] statsKeysStr = new String[] {"tube", "state"};
+ private static String[] statsKeysInt = new String[] {"age", "time-left", "timeouts", "releases", "buries", "kicks"};
+
+ private final Runnable initTask = new Runnable() {
+ @Override
+ public void run() {
+ client = getEndpoint().getConnection().newReadingClient(useBlockIO);
+ }
+ };
+ private final Callable<Exchange> pollTask = new Callable<Exchange>() {
+ final Integer NO_WAIT = Integer.valueOf(0);
+
+ @Override
+ public Exchange call() throws Exception {
+ if (client == null)
+ throw new RuntimeCamelException("Beanstalk client not initialized");
+
+ try {
+ final Job job = client.reserve(NO_WAIT);
+ if (job == null)
+ return null;
+
+ if (log.isDebugEnabled())
+ log.debug(String.format("Received job ID %d (data length %d)", job.getJobId(), job.getData().length));
+
+ final Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOnly);
+ exchange.setProperty(Headers.JOB_ID, job.getJobId());
+ exchange.getIn().setBody(job.getData(), byte[].class);
+
+ Map<String,String> jobStats = client.statsJob(job.getJobId());
+ if (jobStats != null) {
+ for (String key : statsKeysStr) {
+ if (jobStats.containsKey(key))
+ exchange.setProperty(Headers.PREFIX+key, jobStats.get(key).trim());
+ }
+
+ if (jobStats.containsKey("pri"))
+ exchange.setProperty(Headers.PRIORITY, Long.parseLong(jobStats.get("pri").trim()));
+
+ for (String key : statsKeysInt) {
+ if (jobStats.containsKey(key))
+ exchange.setProperty(Headers.PREFIX+key, Integer.parseInt(jobStats.get(key).trim()));
+ }
+ }
+
+ if (deleteImmediately)
+ client.delete(job.getJobId());
+ else
+ exchange.addOnCompletion(sync);
+
+ return exchange;
+ } catch (BeanstalkException e) {
+ log.error("Beanstalk client error", e);
+ resetClient();
+ return null;
+ }
+ }
+
+ };
+
+ public BeanstalkConsumer(final BeanstalkEndpoint endpoint, final Processor processor) {
+ super(endpoint, processor);
+ }
+
+ @Override
+ protected int poll() throws Exception {
+ int messagesPolled = 0;
+ while (isPollAllowed()) {
+ final Exchange exchange = executor.submit(pollTask).get();
+ if (exchange == null)
+ break;
+
+ ++messagesPolled;
+ getProcessor().process(exchange);
+ }
+ return messagesPolled;
+ }
+
+ public String getOnFailure() {
+ return onFailure;
+ }
+
+ public void setOnFailure(String onFailure) {
+ this.onFailure = onFailure;
+ }
+
+ public boolean getUseBlockIO() {
+ return useBlockIO;
+ }
+
+ public void setUseBlockIO(boolean useBlockIO) {
+ this.useBlockIO = useBlockIO;
+ }
+
+ public boolean getAwaitJob() {
+ return !deleteImmediately;
+ }
+
+ public void setAwaitJob(boolean awaitingCompletion) {
+ this.deleteImmediately = !awaitingCompletion;
+ }
+
+ @Override
+ public BeanstalkEndpoint getEndpoint() {
+ return (BeanstalkEndpoint) super.getEndpoint();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ executor = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "Beanstalk-Consumer");
+ executor.execute(initTask);
+ sync = new Sync();
+ super.doStart();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ if (executor != null)
+ executor.shutdown();
+ }
+
+ protected void resetClient() {
+ if (client != null)
+ client.close();
+ initTask.run();
+ }
+
+ class Sync implements Synchronization {
+ protected final Command successCommand;
+ protected final Command failureCommand;
+
+ public Sync() {
+ successCommand = new DeleteCommand(getEndpoint());
+
+ if (BeanstalkComponent.COMMAND_BURY.equals(onFailure))
+ failureCommand = new BuryCommand(getEndpoint());
+ else if (BeanstalkComponent.COMMAND_RELEASE.equals(onFailure))
+ failureCommand = new ReleaseCommand(getEndpoint());
+ else if (BeanstalkComponent.COMMAND_DELETE.equals(onFailure))
+ failureCommand = new DeleteCommand(getEndpoint());
+ else
+ throw new IllegalArgumentException(String.format("Unknown failure command: %s", onFailure));
+ }
+
+ @Override
+ public void onComplete(final Exchange exchange) {
+ try {
+ executor.submit(new RunCommand(successCommand, exchange)).get();
+ } catch (Exception e) {
+ if (log.isErrorEnabled())
+ log.error(String.format("Could not run completion of exchange %s", exchange), e);
+ }
+ }
+
+ @Override
+ public void onFailure(final Exchange exchange) {
+ try {
+ executor.submit(new RunCommand(failureCommand, exchange)).get();
+ } catch (Exception e) {
+ if (log.isErrorEnabled())
+ log.error(String.format("%s could not run failure of exchange %s", failureCommand.getClass().getName(), exchange), e);
+ }
+ }
+
+ class RunCommand implements Runnable {
+ private final Command command;
+ private final Exchange exchange;
+
+ public RunCommand(final Command command, final Exchange exchange) {
+ this.command = command;
+ this.exchange = exchange;
+ }
+
+ @Override
+ public void run() {
+ try {
+ try {
+ command.act(client, exchange);
+ } catch (BeanstalkException e) {
+ if (log.isWarnEnabled())
+ log.warn(String.format("Post-processing %s of exchange %s failed, retrying.", command.getClass().getName(), exchange), e);
+ resetClient();
+ command.act(client, exchange);
+ }
+ } catch (final Exception e) {
+ if (log.isErrorEnabled())
+ log.error(String.format("%s could not post-process exchange %s", command.getClass().getName(), exchange), e);
+ exchange.setException(e);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java
new file mode 100644
index 0000000..62c6809
--- /dev/null
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkEndpoint.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import com.surftools.BeanstalkClient.Client;
+import org.apache.camel.Component;
+import org.apache.camel.Producer;
+import org.apache.camel.component.beanstalk.processors.*;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ScheduledPollEndpoint;
+
+/**
+ * @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
+ * @see BeanstalkConsumer
+ * @see org.apache.camel.component.beanstalk.processors.PutCommand
+ */
+public class BeanstalkEndpoint extends ScheduledPollEndpoint {
+ final ConnectionSettings conn;
+
+ String command = BeanstalkComponent.COMMAND_PUT;
+ long priority = BeanstalkComponent.DEFAULT_PRIORITY;
+ int delay = BeanstalkComponent.DEFAULT_DELAY;
+ int timeToRun = BeanstalkComponent.DEFAULT_TIME_TO_RUN;
+
+ BeanstalkEndpoint(final String uri, final Component component, final ConnectionSettings conn) {
+ super(uri, component);
+
+ this.conn = conn;
+ }
+
+ public ConnectionSettings getConnection() {
+ return conn;
+ }
+
+ /**
+ * The command {@link Producer} must execute
+ *
+ * @param command
+ */
+ public void setCommand(final String command) {
+ this.command = command;
+ }
+
+ public void setJobPriority(final long priority) {
+ this.priority = priority;
+ }
+
+ public long getJobPriority() {
+ return priority;
+ }
+
+ public void setJobDelay(final int delay) {
+ this.delay = delay;
+ }
+
+ public int getJobDelay() {
+ return delay;
+ }
+
+ public void setJobTimeToRun(final int timeToRun) {
+ this.timeToRun = timeToRun;
+ }
+
+ public int getJobTimeToRun() {
+ return timeToRun;
+ }
+
+ /**
+ * Creates Camel producer.
+ * <p>
+ * Depending on the command parameter (see {@link BeanstalkComponent} URI) it
+ * will create one of the producer implementations.
+ *
+ * @return {@link Producer} instance
+ * @throws IllegalArgumentException when {@link ConnectionSettings} cannot
+ * create a writable {@link Client}
+ */
+ @Override
+ public Producer createProducer() throws Exception {
+ Command cmd = null;
+ if (BeanstalkComponent.COMMAND_PUT.equals(command))
+ cmd = new PutCommand(this);
+ else if (BeanstalkComponent.COMMAND_RELEASE.equals(command))
+ cmd = new ReleaseCommand(this);
+ else if (BeanstalkComponent.COMMAND_BURY.equals(command))
+ cmd = new BuryCommand(this);
+ else if (BeanstalkComponent.COMMAND_TOUCH.equals(command))
+ cmd = new TouchCommand(this);
+ else if (BeanstalkComponent.COMMAND_DELETE.equals(command))
+ cmd = new DeleteCommand(this);
+ else if (BeanstalkComponent.COMMAND_KICK.equals(command))
+ cmd = new KickCommand(this);
+ else
+ throw new IllegalArgumentException(String.format("Unknown command for Beanstalk endpoint: %s", command));
+
+ return new BeanstalkProducer(this, cmd);
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ BeanstalkConsumer consumer = new BeanstalkConsumer(this, processor);
+ configureConsumer(consumer);
+ return consumer;
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkExchangeHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkExchangeHelper.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkExchangeHelper.java
new file mode 100644
index 0000000..19a298f
--- /dev/null
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkExchangeHelper.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.NoSuchHeaderException;
+import org.apache.camel.util.ExchangeHelper;
+
+/**
+ *
+ * @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
+ */
+public final class BeanstalkExchangeHelper {
+ public static long getPriority(final BeanstalkEndpoint endpoint, final Message in) {
+ return in.getHeader(Headers.PRIORITY, Long.valueOf(endpoint.getJobPriority()), Long.class).longValue();
+ }
+
+ public static int getDelay(final BeanstalkEndpoint endpoint, final Message in) {
+ return in.getHeader(Headers.DELAY, Integer.valueOf(endpoint.getJobDelay()), Integer.class).intValue();
+ }
+
+ public static int getTimeToRun(final BeanstalkEndpoint endpoint, final Message in) {
+ return in.getHeader(Headers.TIME_TO_RUN, Integer.valueOf(endpoint.getJobTimeToRun()), Integer.class).intValue();
+ }
+
+ public static long getJobID(final Exchange exchange) throws NoSuchHeaderException {
+ Long jobId = exchange.getProperty(Headers.JOB_ID, Long.class);
+ if (jobId != null)
+ return jobId;
+ return ExchangeHelper.getMandatoryHeader(exchange, Headers.JOB_ID, Long.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java
new file mode 100644
index 0000000..83cfa98
--- /dev/null
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkProducer.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.camel.component.beanstalk.processors.Command;
+import com.surftools.BeanstalkClient.BeanstalkException;
+import com.surftools.BeanstalkClient.Client;
+import org.apache.camel.Exchange;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.impl.DefaultProducer;
+
+/**
+ *
+ * @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
+ */
+public class BeanstalkProducer extends DefaultProducer implements AsyncProcessor {
+ private ExecutorService executor = null;
+
+ Client client = null;
+ final Command command;
+
+ public BeanstalkProducer(BeanstalkEndpoint endpoint, final Command command) throws Exception {
+ super(endpoint);
+ this.command = command;
+ }
+
+ @Override
+ public void process(final Exchange exchange) throws Exception {
+ Future f = executor.submit(new RunCommand(exchange));
+ f.get();
+ }
+
+ @Override
+ public boolean process(final Exchange exchange, final AsyncCallback callback) {
+ try {
+ executor.submit(new RunCommand(exchange, callback));
+ } catch (Throwable t) {
+ exchange.setException(t);
+ callback.done(true);
+ return true;
+ }
+ return false;
+ }
+
+ protected void resetClient() {
+ closeClient();
+ initClient();
+ }
+
+ protected void closeClient() {
+ if (client != null)
+ client.close();
+ }
+
+ protected void initClient() {
+ this.client = getEndpoint().getConnection().newWritingClient();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ executor = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "Beanstalk-Producer");
+ executor.execute(new Runnable() {
+ public void run() {
+ initClient();
+ }
+ });
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ executor.shutdown();
+ closeClient();
+ super.doStop();
+ }
+
+ @Override
+ public BeanstalkEndpoint getEndpoint() {
+ return (BeanstalkEndpoint) super.getEndpoint();
+ }
+
+ class RunCommand implements Runnable {
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+
+ public RunCommand(final Exchange exchange) {
+ this(exchange, null);
+ }
+
+ public RunCommand(final Exchange exchange, final AsyncCallback callback) {
+ this.exchange = exchange;
+ this.callback = callback;
+ }
+
+ @Override
+ public void run() {
+ try {
+ try {
+ command.act(client, exchange);
+ } catch (BeanstalkException e) {
+ /* Retry one time */
+ resetClient();
+ command.act(client, exchange);
+ }
+ } catch (Throwable t) {
+ exchange.setException(t);
+ } finally {
+ if (callback != null)
+ callback.done(false);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettings.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettings.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettings.java
new file mode 100644
index 0000000..35359b1
--- /dev/null
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettings.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Scanner;
+import java.net.URLDecoder;
+import com.surftools.BeanstalkClient.Client;
+import com.surftools.BeanstalkClientImpl.ClientImpl;
+import java.io.UnsupportedEncodingException;
+
+/**
+ * Represents the connection to Beanstalk.
+ * <p>
+ * Along with the list of tubes it may watch.
+ *
+ * @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
+ */
+public class ConnectionSettings {
+ final String host;
+ final int port;
+ final String[] tubes;
+
+ public ConnectionSettings(final String tube) {
+ this(Client.DEFAULT_HOST, Client.DEFAULT_PORT, tube);
+ }
+
+ public ConnectionSettings(final String host, final String tube) {
+ this(host, Client.DEFAULT_PORT, tube);
+ }
+
+ public ConnectionSettings(final String host, final int port, final String tube) {
+ this.host = host;
+ this.port = port;
+
+ final Scanner scanner = new Scanner(tube);
+ scanner.useDelimiter("\\+");
+ final ArrayList<String> buffer = new ArrayList<String>();
+ while (scanner.hasNext()) {
+ final String tubeRaw = scanner.next();
+ try {
+ buffer.add( URLDecoder.decode(tubeRaw, "UTF-8") );
+ } catch (UnsupportedEncodingException e) {
+ buffer.add(tubeRaw);
+ }
+ }
+ this.tubes = buffer.toArray(new String[0]);
+ scanner.close();
+ }
+
+ /**
+ * Returns the {@link Client} instance ready for writing
+ * operations, e.g. "put".
+ * <p>
+ * <code>use(tube)</code> is applied during this call.
+ *
+ * @return {@link Client} instance
+ * @throws IllegalArgumentException the exception is raised when this ConnectionSettings
+ * has more than one tube.
+ */
+ public Client newWritingClient() throws IllegalArgumentException {
+ if (tubes.length > 1) {
+ throw new IllegalArgumentException("There must be only one tube specified for Beanstalk producer");
+ }
+
+ final String tube = tubes.length > 0 ? tubes[0] : BeanstalkComponent.DEFAULT_TUBE;
+
+ final ClientImpl client = new ClientImpl(host, port);
+
+ /* FIXME: There is a problem in JavaBeanstalkClient 1.4.4 (at least in 1.4.4),
+ when using uniqueConnectionPerThread=false. The symptom is that ProtocolHandler
+ breaks the protocol, reading incomplete messages. To be investigated. */
+ //client.setUniqueConnectionPerThread(false);
+ client.useTube(tube);
+ return client;
+ }
+
+ /**
+ * Returns the {@link Client} instance for reading operations with all
+ * the tubes aleady watched
+ * <p>
+ * <code>watch(tube)</code> is applied for every tube during this call.
+ *
+ * @param useBlockIO configuration param to {@link Client}
+ * @return {@link Client} instance
+ */
+ public Client newReadingClient(boolean useBlockIO) {
+ final ClientImpl client = new ClientImpl(host, port, useBlockIO);
+
+ /* FIXME: There is a problem in JavaBeanstalkClient 1.4.4 (at least in 1.4.4),
+ when using uniqueConnectionPerThread=false. The symptom is that ProtocolHandler
+ breaks the protocol, reading incomplete messages. To be investigated. */
+ //client.setUniqueConnectionPerThread(false);
+ for (String tube : tubes)
+ client.watch(tube);
+ return client;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj instanceof ConnectionSettings) {
+ final ConnectionSettings other = (ConnectionSettings) obj;
+ return other.host.equals(host) && other.port == port && Arrays.equals(other.tubes, tubes);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return 41*(41*(41+host.hashCode())+port)+Arrays.hashCode(tubes);
+ }
+
+ @Override
+ public String toString() {
+ return "beanstalk://"+host+":"+port+"/"+Arrays.toString(tubes);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettingsFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettingsFactory.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettingsFactory.java
new file mode 100644
index 0000000..7949cf2
--- /dev/null
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/ConnectionSettingsFactory.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import com.surftools.BeanstalkClient.Client;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ *
+ * @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
+ */
+public class ConnectionSettingsFactory {
+ public static final ConnectionSettingsFactory DEFAULT = new ConnectionSettingsFactory();
+
+ final Pattern HostPortTubeRE = Pattern.compile("^(([\\w.-]+)(:([\\d]+))?/)?([\\w%+]*)$");
+
+ public ConnectionSettings parseUri(final String remaining) throws IllegalArgumentException {
+ final Matcher m = HostPortTubeRE.matcher(remaining);
+ if (!m.matches())
+ throw new IllegalArgumentException(String.format("Invalid path format: %s - should be [<hostName>[:<port>]/][<tubes>]", remaining));
+
+ final String host = m.group(2) != null ? m.group(2) : Client.DEFAULT_HOST;
+ final int port = m.group(4) != null ? Integer.parseInt(m.group(4)) : Client.DEFAULT_PORT;
+ final String tubes = m.group(5) != null ? m.group(5) : "";
+ return new ConnectionSettings(host, port, tubes);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/Headers.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/Headers.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/Headers.java
new file mode 100644
index 0000000..b944872
--- /dev/null
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/Headers.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+/**
+ *
+ * @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
+ */
+public final class Headers {
+ public static final String PREFIX = "beanstalk.";
+
+ // in
+ public static final String PRIORITY = PREFIX+"priority";
+ public static final String DELAY = PREFIX+"delay";
+ public static final String TIME_TO_RUN = PREFIX+"timeToRun";
+
+ // in/out
+ public static final String JOB_ID = PREFIX+"jobId";
+
+ // out
+ public static final String RESULT = PREFIX+"result";
+
+ // other info
+ public static final String TUBE = PREFIX+"tube";
+ public static final String STATE = PREFIX+"state";
+ public static final String AGE = PREFIX+"age";
+ public static final String TIME_LEFT = PREFIX+"time-left";
+ public static final String TIMEOUTS = PREFIX+"timeouts";
+ public static final String RELEASES = PREFIX+"releases";
+ public static final String BURIES = PREFIX+"buries";
+ public static final String KICKS = PREFIX+"kicks";
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/BuryCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/BuryCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/BuryCommand.java
new file mode 100644
index 0000000..2df0e60
--- /dev/null
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/BuryCommand.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.processors;
+
+import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
+import org.apache.camel.component.beanstalk.BeanstalkExchangeHelper;
+import org.apache.camel.component.beanstalk.Headers;
+import com.surftools.BeanstalkClient.Client;
+import org.apache.camel.Exchange;
+import org.apache.camel.NoSuchHeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BuryCommand extends DefaultCommand {
+ private final transient Logger log = LoggerFactory.getLogger(getClass());
+
+ public BuryCommand(BeanstalkEndpoint endpoint) {
+ super(endpoint);
+ }
+
+ @Override
+ public void act(final Client client, final Exchange exchange) throws NoSuchHeaderException {
+ final Long jobId = BeanstalkExchangeHelper.getJobID(exchange);
+ final long priority = BeanstalkExchangeHelper.getPriority(endpoint, exchange.getIn());
+ final boolean result = client.bury(jobId.longValue(), priority);
+
+ if (!result && log.isWarnEnabled())
+ log.warn(String.format("Failed to bury job %d (with priority %d)", jobId, priority));
+ else if (log.isDebugEnabled())
+ log.debug(String.format("Job %d buried with priority %d. Result is %b", jobId, priority, result));
+
+ answerWith(exchange, Headers.RESULT, result);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/Command.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/Command.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/Command.java
new file mode 100644
index 0000000..7088279
--- /dev/null
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/Command.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.processors;
+
+import com.surftools.BeanstalkClient.Client;
+import org.apache.camel.Exchange;
+
+public interface Command {
+ public void act(Client client, Exchange exchange) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DefaultCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DefaultCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DefaultCommand.java
new file mode 100644
index 0000000..e8e9a45
--- /dev/null
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DefaultCommand.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.processors;
+
+import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.util.ExchangeHelper;
+
+abstract class DefaultCommand implements Command {
+ protected final BeanstalkEndpoint endpoint;
+
+ public DefaultCommand(BeanstalkEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ protected Message getAnswerMessage(final Exchange exchange) {
+ Message answer = exchange.getIn();
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ answer = exchange.getOut();
+ // preserve headers
+ answer.getHeaders().putAll(exchange.getIn().getHeaders());
+ }
+ return answer;
+ }
+
+ protected void answerWith(final Exchange exchange, final String header, final Object value) {
+ final Message answer = getAnswerMessage(exchange);
+ answer.setHeader(header, value);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DeleteCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DeleteCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DeleteCommand.java
new file mode 100644
index 0000000..f738556
--- /dev/null
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/DeleteCommand.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.processors;
+
+import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
+import org.apache.camel.component.beanstalk.BeanstalkExchangeHelper;
+import org.apache.camel.component.beanstalk.Headers;
+import com.surftools.BeanstalkClient.Client;
+import org.apache.camel.Exchange;
+import org.apache.camel.NoSuchHeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeleteCommand extends DefaultCommand {
+ private final transient Logger log = LoggerFactory.getLogger(getClass());
+
+ public DeleteCommand(BeanstalkEndpoint endpoint) {
+ super(endpoint);
+ }
+
+ @Override
+ public void act(final Client client, final Exchange exchange) throws NoSuchHeaderException {
+ final Long jobId = BeanstalkExchangeHelper.getJobID(exchange);
+ final boolean result = client.delete(jobId.longValue());
+ if (!result && log.isWarnEnabled())
+ log.warn(String.format("Failed to delete job %d", jobId));
+ else if (log.isDebugEnabled())
+ log.debug(String.format("Job %d deleted. Result is %b", jobId, result));
+
+ answerWith(exchange, Headers.RESULT, result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/KickCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/KickCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/KickCommand.java
new file mode 100644
index 0000000..7bad253
--- /dev/null
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/KickCommand.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.processors;
+
+import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
+import com.surftools.BeanstalkClient.Client;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Message;
+import org.apache.camel.NoSuchHeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KickCommand extends DefaultCommand {
+ private final transient Logger log = LoggerFactory.getLogger(getClass());
+
+ public KickCommand(BeanstalkEndpoint endpoint) {
+ super(endpoint);
+ }
+
+ @Override
+ public void act(final Client client, final Exchange exchange) throws NoSuchHeaderException, InvalidPayloadException {
+ final Integer jobs = exchange.getIn().getMandatoryBody(Integer.class);
+ final int result = client.kick(jobs);
+ if (log.isDebugEnabled())
+ log.debug(String.format("Kick %d jobs. Kicked %d actually.", jobs, result));
+
+ final Message answer = getAnswerMessage(exchange);
+ answer.setBody(result, Integer.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/PutCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/PutCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/PutCommand.java
new file mode 100644
index 0000000..c6fa32e
--- /dev/null
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/PutCommand.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.processors;
+
+import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
+import org.apache.camel.component.beanstalk.BeanstalkExchangeHelper;
+import org.apache.camel.component.beanstalk.Headers;
+import com.surftools.BeanstalkClient.Client;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.NoSuchHeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PutCommand extends DefaultCommand {
+ private final transient Logger log = LoggerFactory.getLogger(getClass());
+
+ public PutCommand(BeanstalkEndpoint endpoint) {
+ super(endpoint);
+ }
+
+ @Override
+ public void act(final Client client, final Exchange exchange) throws NoSuchHeaderException {
+ final Message in = exchange.getIn();
+
+ final long priority = BeanstalkExchangeHelper.getPriority(endpoint, in);
+ final int delay = BeanstalkExchangeHelper.getDelay(endpoint, in);
+ final int timeToRun = BeanstalkExchangeHelper.getTimeToRun(endpoint, in);
+
+ final long jobId = client.put(priority, delay, timeToRun, in.getBody(byte[].class));
+ if (log.isDebugEnabled())
+ log.debug(String.format("Created job %d with priority %d, delay %d seconds and time to run %d", jobId, priority, delay, timeToRun));
+
+ answerWith(exchange, Headers.JOB_ID, jobId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/ReleaseCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/ReleaseCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/ReleaseCommand.java
new file mode 100644
index 0000000..f41e080
--- /dev/null
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/ReleaseCommand.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.processors;
+
+import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
+import org.apache.camel.component.beanstalk.BeanstalkExchangeHelper;
+import org.apache.camel.component.beanstalk.Headers;
+import com.surftools.BeanstalkClient.Client;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.NoSuchHeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReleaseCommand extends DefaultCommand {
+ private final transient Logger log = LoggerFactory.getLogger(getClass());
+
+ public ReleaseCommand(BeanstalkEndpoint endpoint) {
+ super(endpoint);
+ }
+
+ @Override
+ public void act(final Client client, final Exchange exchange) throws NoSuchHeaderException {
+ final Message in = exchange.getIn();
+
+ final Long jobId = BeanstalkExchangeHelper.getJobID(exchange);
+ final long priority = BeanstalkExchangeHelper.getPriority(endpoint, in);
+ final int delay = BeanstalkExchangeHelper.getDelay(endpoint, in);
+
+ final boolean result = client.release(jobId.longValue(), priority, delay);
+ if (!result && log.isWarnEnabled())
+ log.warn(String.format("Failed to release job %d (priority %d, delay %d)", jobId, priority, delay));
+ else if (log.isDebugEnabled())
+ log.debug(String.format("Job %d released with priority %d, delay %d seconds. Result is %b", jobId, priority, delay, result));
+
+ answerWith(exchange, Headers.RESULT, result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/TouchCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/TouchCommand.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/TouchCommand.java
new file mode 100644
index 0000000..c43f32c
--- /dev/null
+++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/processors/TouchCommand.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk.processors;
+
+import org.apache.camel.component.beanstalk.BeanstalkEndpoint;
+import org.apache.camel.component.beanstalk.Headers;
+import com.surftools.BeanstalkClient.Client;
+import org.apache.camel.Exchange;
+import org.apache.camel.NoSuchHeaderException;
+import org.apache.camel.util.ExchangeHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TouchCommand extends DefaultCommand {
+ private final transient Logger log = LoggerFactory.getLogger(getClass());
+
+ public TouchCommand(BeanstalkEndpoint endpoint) {
+ super(endpoint);
+ }
+
+ @Override
+ public void act(final Client client, final Exchange exchange) throws NoSuchHeaderException {
+ final Long jobId = ExchangeHelper.getMandatoryHeader(exchange, Headers.JOB_ID, Long.class);
+ final boolean result = client.touch(jobId.longValue());
+ if (!result && log.isWarnEnabled())
+ log.warn(String.format("Failed to touch job %d", jobId));
+ else if (log.isDebugEnabled())
+ log.debug(String.format("Job %d touched. Result is %b", jobId, result));
+
+ answerWith(exchange, Headers.RESULT, result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/resources/META-INF/LICENSE.txt
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/resources/META-INF/LICENSE.txt b/components/camel-beanstalk/src/main/resources/META-INF/LICENSE.txt
new file mode 100644
index 0000000..6b0b127
--- /dev/null
+++ b/components/camel-beanstalk/src/main/resources/META-INF/LICENSE.txt
@@ -0,0 +1,203 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/resources/META-INF/NOTICE.txt
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/resources/META-INF/NOTICE.txt b/components/camel-beanstalk/src/main/resources/META-INF/NOTICE.txt
new file mode 100644
index 0000000..2e215bf
--- /dev/null
+++ b/components/camel-beanstalk/src/main/resources/META-INF/NOTICE.txt
@@ -0,0 +1,11 @@
+ =========================================================================
+ == NOTICE file corresponding to the section 4 d of ==
+ == the Apache License, Version 2.0, ==
+ == in this case for the Apache Camel distribution. ==
+ =========================================================================
+
+ This product includes software developed by
+ The Apache Software Foundation (http://www.apache.org/).
+
+ Please read the different LICENSE files present in the licenses directory of
+ this distribution.
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/main/resources/META-INF/services/org/apache/camel/component/beanstalk
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/main/resources/META-INF/services/org/apache/camel/component/beanstalk b/components/camel-beanstalk/src/main/resources/META-INF/services/org/apache/camel/component/beanstalk
new file mode 100644
index 0000000..c290f54
--- /dev/null
+++ b/components/camel-beanstalk/src/main/resources/META-INF/services/org/apache/camel/component/beanstalk
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.beanstalk.BeanstalkComponent
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/AwaitingConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/AwaitingConsumerTest.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/AwaitingConsumerTest.java
new file mode 100644
index 0000000..164dff2
--- /dev/null
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/AwaitingConsumerTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import com.surftools.BeanstalkClient.Job;
+import com.surftools.BeanstalkClient.BeanstalkException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+
+public class AwaitingConsumerTest extends BeanstalkMockTestSupport {
+ final String testMessage = "hello, world";
+
+ @EndpointInject(uri = "beanstalk:tube")
+ protected BeanstalkEndpoint endpoint;
+
+ @Test
+ public void testReceive() throws Exception {
+ final Job jobMock = mock(Job.class);
+ final long jobId = 111;
+ final byte[] payload = Helper.stringToBytes(testMessage);
+
+ when(jobMock.getJobId()).thenReturn(jobId);
+ when(jobMock.getData()).thenReturn(payload);
+ when(client.reserve(anyInt()))
+ .thenReturn(jobMock)
+ .thenReturn(null);
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+ result.expectedBodiesReceived(testMessage);
+ result.expectedPropertyReceived(Headers.JOB_ID, jobId);
+ result.message(0).header(Headers.JOB_ID).isEqualTo(jobId);
+ result.assertIsSatisfied(100);
+
+ verify(client, atLeast(1)).reserve(0);
+ verify(client, atLeast(1)).delete(jobId);
+ }
+
+ @Test
+ public void testBeanstalkException() throws Exception {
+ final Job jobMock = mock(Job.class);
+ final long jobId = 111;
+ final byte[] payload = Helper.stringToBytes(testMessage);
+
+ when(jobMock.getJobId()).thenReturn(jobId);
+ when(jobMock.getData()).thenReturn(payload);
+ when(client.reserve(anyInt()))
+ .thenThrow(new BeanstalkException("test"))
+ .thenReturn(jobMock);
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+ result.expectedBodiesReceived(testMessage);
+ result.expectedPropertyReceived(Headers.JOB_ID, jobId);
+ result.message(0).header(Headers.JOB_ID).isEqualTo(jobId);
+ result.assertIsSatisfied(100);
+
+ verify(client, atLeast(1)).reserve(anyInt());
+ verify(client, times(1)).close();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("beanstalk:tube").to("mock:result");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/a4ff6b62/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/BeanstalkMockTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/BeanstalkMockTestSupport.java b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/BeanstalkMockTestSupport.java
new file mode 100644
index 0000000..42b3114
--- /dev/null
+++ b/components/camel-beanstalk/src/test/java/org/apache/camel/component/beanstalk/BeanstalkMockTestSupport.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.beanstalk;
+
+import com.surftools.BeanstalkClient.Client;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import static org.mockito.Mockito.*;
+
+public class BeanstalkMockTestSupport extends CamelTestSupport {
+ @Mock Client client;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ reset(client);
+ Helper.mockComponent(client);
+ super.setUp();
+ }
+
+ @After
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ Helper.revertComponent();
+ }
+}