You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/03/06 19:20:59 UTC
svn commit: r919810 - in
/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel:
ActiveMQPollingComponent.java ActiveMQPollingConsumer.java
ActiveMQPollingEndpoint.java JamesCamelConstants.java
Author: norman
Date: Sat Mar 6 18:20:59 2010
New Revision: 919810
URL: http://svn.apache.org/viewvc?rev=919810&view=rev
Log:
Add Polling stuff for RemoteDelivery rewrite. See JAMES-977
Added:
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQPollingComponent.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQPollingConsumer.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQPollingEndpoint.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java
Added: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQPollingComponent.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQPollingComponent.java?rev=919810&view=auto
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQPollingComponent.java (added)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQPollingComponent.java Sat Mar 6 18:20:59 2010
@@ -0,0 +1,37 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.transport.camel;
+
+import java.util.Map;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+/**
+ *
+ */
+public class ActiveMQPollingComponent extends DefaultComponent{
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String arg1, Map<String, Object> arg2) throws Exception {
+ return new ActiveMQPollingEndpoint(uri, this);
+ }
+
+}
Added: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQPollingConsumer.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQPollingConsumer.java?rev=919810&view=auto
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQPollingConsumer.java (added)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQPollingConsumer.java Sat Mar 6 18:20:59 2010
@@ -0,0 +1,75 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.transport.camel;
+
+import org.apache.camel.ConsumerTemplate;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.ScheduledPollConsumer;
+
+/**
+ * Consumer which polls an activemq endpoint with a selector which only selects messages with JAMES_NEXT_DELIVERY header
+ * value is smaller then the current time in milliseconds.
+ *
+ *
+ *
+ *
+ */
+public class ActiveMQPollingConsumer extends ScheduledPollConsumer{
+
+ private ConsumerTemplate consumerTemplate;
+ private String receiveEndpointUri;
+
+ public ActiveMQPollingConsumer(DefaultEndpoint endpoint, Processor processor, ConsumerTemplate consumerTemplate) {
+ super(endpoint, processor);
+ this.consumerTemplate = consumerTemplate;
+ receiveEndpointUri = getEndpoint().getEndpointUri().replace(getEndpoint().getEndpointKey(),"activemq");
+
+ }
+
+ @Override
+ protected void poll() throws Exception {
+
+ StringBuffer consumerUri = new StringBuffer();
+ consumerUri.append(receiveEndpointUri);
+ if (receiveEndpointUri.indexOf("?") > -1) {
+ consumerUri.append("&");
+ } else {
+ consumerUri.append("?");
+ }
+ consumerUri.append("selector=");
+ consumerUri.append(JamesCamelConstants.JAMES_NEXT_DELIVERY);
+ consumerUri.append("<");
+ consumerUri.append(System.currentTimeMillis());
+
+ // process every exchange which is ready. If no exchange is left break the loop
+ while(true) {
+ Exchange ex = consumerTemplate.receiveNoWait(consumerUri.toString());
+ if (ex != null) {
+ getProcessor().process(ex);
+ } else {
+ break;
+ }
+
+ }
+ }
+
+}
Added: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQPollingEndpoint.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQPollingEndpoint.java?rev=919810&view=auto
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQPollingEndpoint.java (added)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQPollingEndpoint.java Sat Mar 6 18:20:59 2010
@@ -0,0 +1,56 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.transport.camel;
+
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.ScheduledPollEndpoint;
+
+public class ActiveMQPollingEndpoint extends ScheduledPollEndpoint{
+
+ public ActiveMQPollingEndpoint(String uri, Component component) {
+ super(uri,component);
+ }
+
+ public Producer createProducer() throws Exception {
+ return null;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.camel.IsSingleton#isSingleton()
+ */
+ public boolean isSingleton() {
+ return true;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.camel.Endpoint#createConsumer(org.apache.camel.Processor)
+ */
+ public Consumer createConsumer(Processor processor) throws Exception {
+ ActiveMQPollingConsumer consumer = new ActiveMQPollingConsumer(this,processor,getCamelContext().createConsumerTemplate());
+ configureConsumer(consumer);
+ return consumer;
+ }
+
+}
Added: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java?rev=919810&view=auto
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java (added)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java Sat Mar 6 18:20:59 2010
@@ -0,0 +1,8 @@
+package org.apache.james.transport.camel;
+
+public interface JamesCamelConstants {
+
+ public final static String JAMES_NEXT_DELIVERY = "JAMES_NEXT_DELIVERY";
+
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org