You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/03/01 15:20:41 UTC
svn commit: r1075828 - in
/camel/trunk/camel-core/src/main/java/org/apache/camel:
component/log/LogEndpoint.java component/log/LogProducer.java
component/mock/MockEndpoint.java processor/LogProcessor.java
util/AsyncProcessorHelper.java
Author: davsclaus
Date: Tue Mar 1 14:20:41 2011
New Revision: 1075828
URL: http://svn.apache.org/viewvc?rev=1075828&view=rev
Log:
CAMEL-3738: Prefer to use async processor for log and mock components/eips.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java?rev=1075828&r1=1075827&r2=1075828&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java Tue Mar 1 14:20:41 2011
@@ -17,6 +17,7 @@
package org.apache.camel.component.log;
import org.apache.camel.Component;
+import org.apache.camel.Producer;
import org.apache.camel.impl.ProcessorEndpoint;
import org.apache.camel.processor.CamelLogger;
import org.apache.camel.util.ServiceHelper;
@@ -61,6 +62,11 @@ public class LogEndpoint extends Process
}
@Override
+ public Producer createProducer() throws Exception {
+ return new LogProducer(this, getLogger());
+ }
+
+ @Override
protected String createEndpointUri() {
return "log:" + logger.toString();
}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java?rev=1075828&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/log/LogProducer.java Tue Mar 1 14:20:41 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.log;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.processor.CamelLogger;
+
+/**
+ * Log producer.
+ */
+public class LogProducer extends DefaultAsyncProducer {
+
+ private final CamelLogger logger;
+
+ public LogProducer(Endpoint endpoint, CamelLogger logger) {
+ super(endpoint);
+ this.logger = logger;
+ }
+
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ logger.process(exchange);
+ callback.done(true);
+ return true;
+ }
+}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?rev=1075828&r1=1075827&r2=1075828&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java Tue Mar 1 14:20:41 2011
@@ -30,6 +30,7 @@ import java.util.concurrent.CopyOnWriteA
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
@@ -43,8 +44,8 @@ import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.builder.ExpressionClause;
import org.apache.camel.builder.ProcessorBuilder;
+import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.impl.InterceptSendToEndpoint;
import org.apache.camel.spi.BrowsableEndpoint;
import org.apache.camel.util.CamelContextHelper;
@@ -256,9 +257,11 @@ public class MockEndpoint extends Defaul
}
public Producer createProducer() throws Exception {
- return new DefaultProducer(this) {
- public void process(Exchange exchange) {
+ return new DefaultAsyncProducer(this) {
+ public boolean process(Exchange exchange, AsyncCallback callback) {
onExchange(exchange);
+ callback.done(true);
+ return true;
}
};
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java?rev=1075828&r1=1075827&r2=1075828&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java Tue Mar 1 14:20:41 2011
@@ -16,16 +16,18 @@
*/
package org.apache.camel.processor;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
-import org.apache.camel.Processor;
+import org.apache.camel.util.AsyncProcessorHelper;
/**
* A processor which evaluates an Expression and logs it.
*
* @version
*/
-public class LogProcessor implements Processor, Traceable {
+public class LogProcessor implements AsyncProcessor, Traceable {
private final Expression expression;
private final CamelLogger logger;
@@ -36,8 +38,14 @@ public class LogProcessor implements Pro
}
public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
String msg = expression.evaluate(exchange, String.class);
logger.log(msg);
+ return true;
}
@Override
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java?rev=1075828&r1=1075827&r2=1075828&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java Tue Mar 1 14:20:41 2011
@@ -100,7 +100,7 @@ public final class AsyncProcessorHelper
*/
public static void process(final AsyncProcessor processor, final Exchange exchange) throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
- boolean sync = processor.process(exchange, new AsyncCallback() {
+ final AsyncCallback callback = new AsyncCallback() {
public void done(boolean doneSync) {
if (!doneSync) {
if (LOG.isTraceEnabled()) {
@@ -108,13 +108,17 @@ public final class AsyncProcessorHelper
}
latch.countDown();
}
+
}
@Override
public String toString() {
return "Done " + processor;
}
- });
+ };
+
+ boolean sync = process(processor, exchange, callback);
+
if (!sync) {
if (LOG.isTraceEnabled()) {
LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: " + exchange.getExchangeId() + " -> " + exchange);