You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2013/07/29 07:54:25 UTC
[2/3] git commit: CAMEL-6575 Enhancements for camel-avro with thanks
to Vitalii
CAMEL-6575 Enhancements for camel-avro with thanks to Vitalii
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6783ceab
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6783ceab
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6783ceab
Branch: refs/heads/master
Commit: 6783ceab5a24c030e5bcac8f70d5994b4b6d40bb
Parents: 86077e4
Author: Willem Jiang <ni...@apache.org>
Authored: Mon Jul 29 11:27:20 2013 +0800
Committer: Willem Jiang <ni...@apache.org>
Committed: Mon Jul 29 11:27:20 2013 +0800
----------------------------------------------------------------------
.../camel/component/avro/AvroComponent.java | 69 ++++++-
.../component/avro/AvroComponentException.java | 39 ++++
.../camel/component/avro/AvroConfiguration.java | 50 ++++-
.../camel/component/avro/AvroConstants.java | 1 +
.../camel/component/avro/AvroConsumer.java | 14 +-
.../camel/component/avro/AvroEndpoint.java | 20 +-
.../camel/component/avro/AvroHttpConsumer.java | 46 -----
.../camel/component/avro/AvroHttpEndpoint.java | 17 --
.../camel/component/avro/AvroListener.java | 186 +++++++++++++++++++
.../camel/component/avro/AvroNettyConsumer.java | 48 -----
.../camel/component/avro/AvroNettyEndpoint.java | 19 --
.../camel/component/avro/AvroProducer.java | 28 ++-
.../component/avro/AvroReflectRequestor.java | 34 ++++
.../component/avro/AvroReflectResponder.java | 37 ++++
.../camel/component/avro/AvroRequestor.java | 30 ---
.../camel/component/avro/AvroResponder.java | 75 --------
.../component/avro/AvroSpecificRequestor.java | 30 +++
.../component/avro/AvroSpecificResponder.java | 37 ++++
.../org/apache/camel/avro/test/TestPojo.java | 30 +++
.../apache/camel/avro/test/TestReflection.java | 35 ++++
.../camel/avro/test/TestReflectionImpl.java | 61 ++++++
.../component/avro/AvroConsumerTestSupport.java | 130 +++++++++++--
.../component/avro/AvroHttpConsumerTest.java | 45 ++++-
.../component/avro/AvroHttpProducerTest.java | 34 +++-
.../component/avro/AvroNettyConsumerTest.java | 43 ++++-
.../component/avro/AvroNettyProducerTest.java | 50 ++++-
.../avro/AvroNettySpringConsumerTest.java | 2 +
.../component/avro/AvroProducerTestSupport.java | 80 ++++++--
.../camel/component/avro/AvroSettingsTest.java | 54 ++++++
.../camel/component/avro/AvroTestSupport.java | 25 ++-
.../component/avro/processors/GetProcessor.java | 4 +
.../processors/ReflectionInOnlyProcessor.java | 51 +++++
.../processors/ReflectionInOutProcessor.java | 41 ++++
.../camel/component/avro/avro-http-consumer.xml | 11 ++
.../camel/component/avro/avro-http-producer.xml | 30 +++
.../component/avro/avro-netty-consumer.xml | 50 +++++
.../component/avro/avro-netty-producer.xml | 30 +++
37 files changed, 1285 insertions(+), 301 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java
index 48cd68f..db10421 100644
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponent.java
@@ -18,9 +18,13 @@ package org.apache.camel.component.avro;
import java.lang.reflect.Field;
import java.net.URI;
+import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.avro.Protocol;
+import org.apache.avro.reflect.ReflectData;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
@@ -30,6 +34,7 @@ import org.apache.camel.util.URISupport;
public class AvroComponent extends DefaultComponent {
private AvroConfiguration configuration;
+ private ConcurrentMap<String, AvroListener> listenerRegistry = new ConcurrentHashMap<String, AvroListener>();
public AvroComponent() {
}
@@ -81,17 +86,71 @@ public class AvroComponent extends DefaultComponent {
if (config.getProtocol() == null && config.getProtocolClassName() != null) {
Class<?> protocolClass = getCamelContext().getClassResolver().resolveClass(config.getProtocolClassName());
if (protocolClass != null) {
- Field f = protocolClass.getField("PROTOCOL");
- if (f != null) {
- Protocol protocol = (Protocol) f.get(null);
- config.setProtocol(protocol);
- }
+ try {
+ Field f = protocolClass.getField("PROTOCOL");
+ if (f != null) {
+ Protocol protocol = (Protocol) f.get(null);
+ config.setProtocol(protocol);
+ }
+ } catch(NoSuchFieldException e) {
+ ReflectData reflectData = ReflectData.get();
+ config.setProtocol(reflectData.getProtocol(protocolClass));
+ config.setReflectionProtocol(true);
+ }
}
}
if (config.getProtocol() == null) {
throw new IllegalArgumentException("Avro configuration does not contain protocol");
}
+
+ if (config.getMessageName() != null && !config.getProtocol().getMessages().containsKey(config.getMessageName())) {
+ throw new IllegalArgumentException("Message " + config.getMessageName() + " is not defined in protocol");
+ }
+
+ if (config.isSingleParameter()) {
+ Map<String, Protocol.Message> messageMap = config.getProtocol().getMessages();
+ Iterable<Protocol.Message> messagesToCheck = config.getMessageName() == null ?
+ messageMap.values() :
+ Collections.singleton(messageMap.get(config.getMessageName()));
+ for (Protocol.Message message: messagesToCheck) {
+ if (message.getRequest().getFields().size() != 1) {
+ throw new IllegalArgumentException("Single parameter option can't be used with message "
+ + message.getName() + " because it has " + message.getRequest().getFields().size() +
+ " parameters defined"
+ );
+ }
+ }
+ }
+ }
+
+ /**
+ * Registers new responder with uri as key. Registers consumer in responder.
+ * In case if responder is already registered by this uri then just registers consumer.
+ *
+ * @param uri URI of the endpoint without message name
+ * @param messageName message name
+ * @param consumer consumer that will be registered in providers` registry
+ * @throws Exception
+ */
+ public void register(String uri, String messageName, AvroConsumer consumer) throws Exception {
+ AvroListener listener = listenerRegistry.get(uri);
+ if(listener == null) {
+ listener = new AvroListener(consumer.getEndpoint());
+ listenerRegistry.put(uri, listener);
+ }
+ listener.register(messageName, consumer);
+ }
+
+ /**
+ * Calls unregister of consumer by appropriate message name.
+ * In case if all consumers are unregistered then it removes responder from the registry.
+ *
+ * @param uri URI of the endpoint without message name
+ * @param messageName message name
+ */
+ public void unregister(String uri, String messageName) {
+ if(listenerRegistry.get(uri).unregister(messageName)) listenerRegistry.remove(uri);
}
public AvroConfiguration getConfiguration() {
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponentException.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponentException.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponentException.java
new file mode 100644
index 0000000..677259a
--- /dev/null
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroComponentException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.avro;
+
+public class AvroComponentException extends Exception {
+
+ private static final long serialVersionUID = 8915917806189741165L;
+
+ public AvroComponentException() {
+ super();
+ }
+
+ public AvroComponentException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public AvroComponentException(String message) {
+ super(message);
+ }
+
+ public AvroComponentException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java
index 43c68bd..ee138b3 100644
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConfiguration.java
@@ -22,6 +22,8 @@ import java.util.Map;
import org.apache.avro.Protocol;
import org.apache.camel.RuntimeCamelException;
+import org.apache.commons.lang.StringUtils;
+import static org.apache.camel.component.avro.AvroConstants.*;
public class AvroConfiguration implements Cloneable {
@@ -31,8 +33,12 @@ public class AvroConfiguration implements Cloneable {
private String protocolLocation;
private String protocolClassName;
private String transport;
+ private String messageName;
+ private String uriAuthority;
+ private boolean reflectionProtocol;
+ private boolean singleParameter;
- public AvroConfiguration copy() {
+ public AvroConfiguration copy() {
try {
AvroConfiguration answer = (AvroConfiguration) clone();
return answer;
@@ -44,12 +50,20 @@ public class AvroConfiguration implements Cloneable {
public void parseURI(URI uri, Map<String, Object> parameters, AvroComponent component) throws Exception {
transport = uri.getScheme();
- if ((!transport.equalsIgnoreCase("http")) && (!transport.equalsIgnoreCase("netty"))) {
+ if ((!AVRO_HTTP_TRANSPORT.equalsIgnoreCase(transport)) && (!AVRO_NETTY_TRANSPORT.equalsIgnoreCase(transport))) {
throw new IllegalArgumentException("Unrecognized Avro IPC transport: " + protocol + " for uri: " + uri);
}
setHost(uri.getHost());
setPort(uri.getPort());
+
+ if((uri.getPath() != null) && (StringUtils.indexOf(uri.getPath(), AVRO_MESSAGE_NAME_SEPARATOR) != -1)) {
+ String path = StringUtils.substringAfter(uri.getPath(), AVRO_MESSAGE_NAME_SEPARATOR);
+ if(!path.contains(AVRO_MESSAGE_NAME_SEPARATOR)) setMessageName(path);
+ else throw new IllegalArgumentException("Unrecognized Avro message name: " + path + " for uri: " + uri);
+ }
+
+ setUriAuthority(uri.getAuthority());
}
public String getHost() {
@@ -99,4 +113,36 @@ public class AvroConfiguration implements Cloneable {
public void setProtocolClassName(String protocolClassName) {
this.protocolClassName = protocolClassName;
}
+
+ public String getMessageName() {
+ return messageName;
+ }
+
+ public void setMessageName(String messageName) {
+ this.messageName = messageName;
+ }
+
+ public String getUriAuthority() {
+ return uriAuthority;
+ }
+
+ public void setUriAuthority(String uriAuthority) {
+ this.uriAuthority = uriAuthority;
+ }
+
+ public boolean isReflectionProtocol() {
+ return reflectionProtocol;
+ }
+
+ public void setReflectionProtocol(boolean isReflectionProtocol) {
+ this.reflectionProtocol = isReflectionProtocol;
+ }
+
+ public boolean isSingleParameter() {
+ return singleParameter;
+ }
+
+ public void setSingleParameter(boolean singleParameter) {
+ this.singleParameter = singleParameter;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java
index f5d87fa..948af34 100644
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConstants.java
@@ -20,6 +20,7 @@ public final class AvroConstants {
public static final transient String AVRO_NETTY_TRANSPORT = "netty";
public static final transient String AVRO_HTTP_TRANSPORT = "http";
+ public static final transient String AVRO_MESSAGE_NAME_SEPARATOR = "/";
public static final transient String AVRO_MESSAGE_NAME = "CamelAvroMessageName";
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java
index a4f9514..5375c2d 100644
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroConsumer.java
@@ -20,7 +20,7 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
-public abstract class AvroConsumer extends DefaultConsumer {
+public class AvroConsumer extends DefaultConsumer {
public AvroConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -30,4 +30,16 @@ public abstract class AvroConsumer extends DefaultConsumer {
public AvroEndpoint getEndpoint() {
return (AvroEndpoint) super.getEndpoint();
}
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ ((AvroComponent) getEndpoint().getComponent()).register(getEndpoint().getConfiguration().getUriAuthority(), getEndpoint().getConfiguration().getMessageName(), this);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ ((AvroComponent) getEndpoint().getComponent()).unregister(getEndpoint().getConfiguration().getUriAuthority(), getEndpoint().getConfiguration().getMessageName());
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
index 9b28666..0da6796 100644
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
@@ -20,8 +20,10 @@ import org.apache.avro.Protocol;
import org.apache.avro.Schema;
import org.apache.camel.Component;
+import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultEndpoint;
public abstract class AvroEndpoint extends DefaultEndpoint {
@@ -56,12 +58,24 @@ public abstract class AvroEndpoint extends DefaultEndpoint {
public boolean isSingleton() {
return true;
}
+
+ /**
+ * Creates a new <a
+ * href="http://camel.apache.org/event-driven-consumer.html">Event
+ * Driven Consumer</a> which consumes messages from the endpoint using the
+ * given processor
+ *
+ * @param processor the given processor
+ * @return a newly created consumer
+ * @throws Exception can be thrown
+ */
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ return new AvroConsumer(this, processor);
+ }
public AvroConfiguration getConfiguration() {
return configuration;
}
- public Protocol getProtocol() {
- return configuration.getProtocol();
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpConsumer.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpConsumer.java
deleted file mode 100644
index 7f2a230..0000000
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpConsumer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.avro;
-
-import org.apache.avro.ipc.HttpServer;
-
-import org.apache.camel.Endpoint;
-import org.apache.camel.Processor;
-
-public class AvroHttpConsumer extends AvroConsumer {
-
- HttpServer server;
-
- public AvroHttpConsumer(Endpoint endpoint, Processor processor) {
- super(endpoint, processor);
- }
-
- @Override
- protected void doStart() throws Exception {
- AvroConfiguration configuration = getEndpoint().getConfiguration();
- server = new HttpServer(new AvroResponder(this), configuration.getPort());
- server.start();
- }
-
- @Override
- protected void doStop() throws Exception {
- super.doStop();
- if (server != null) {
- server.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java
index 6db3c7f..ee81796 100644
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroHttpEndpoint.java
@@ -45,21 +45,4 @@ public class AvroHttpEndpoint extends AvroEndpoint {
public Producer createProducer() throws Exception {
return new AvroHttpProducer(this);
}
-
- /**
- * Creates a new <a
- * href="http://camel.apache.org/event-driven-consumer.html">Event
- * Driven Consumer</a> which consumes messages from the endpoint using the
- * given processor
- *
- * @param processor the given processor
- * @return a newly created consumer
- * @throws Exception can be thrown
- */
- @Override
- public Consumer createConsumer(Processor processor) throws Exception {
- AvroHttpConsumer answer = new AvroHttpConsumer(this, processor);
- configureConsumer(answer);
- return answer;
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroListener.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroListener.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroListener.java
new file mode 100644
index 0000000..546e490
--- /dev/null
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroListener.java
@@ -0,0 +1,186 @@
+package org.apache.camel.component.avro;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.avro.specific.SpecificData;
+import org.apache.camel.Exchange;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.commons.lang.StringUtils;
+import org.mortbay.log.Log;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.camel.component.avro.AvroConstants.AVRO_HTTP_TRANSPORT;
+import static org.apache.camel.component.avro.AvroConstants.AVRO_NETTY_TRANSPORT;
+
+/**
+ * This class holds server that listen to given protocol:host:port combination and dispatches messages to
+ * different routes mapped.
+ */
+public class AvroListener {
+
+ private ConcurrentMap<String, AvroConsumer> consumerRegistry = new ConcurrentHashMap<String, AvroConsumer>();
+ private AvroConsumer defaultConsumer;
+ private final Server server;
+ private final AvroConfiguration configuration;
+
+ public AvroListener(AvroEndpoint endpoint) throws Exception {
+ configuration = endpoint.getConfiguration();
+ server = initAndStartServer(endpoint.getConfiguration());
+ }
+
+ /**
+ * Initializes and starts http or netty server on basis of transport protocol from configuration.
+ *
+ *
+ * @param configuration
+ * @return Initialized and started server
+ * @throws java.io.IOException
+ */
+ private Server initAndStartServer(AvroConfiguration configuration) throws Exception {
+ SpecificResponder responder;
+ Server server;
+
+ if(configuration.isReflectionProtocol()) {
+ responder = new AvroReflectResponder(configuration.getProtocol(), this);
+ }
+ else {
+ responder = new AvroSpecificResponder(configuration.getProtocol(), this);
+ }
+
+
+ if(AVRO_HTTP_TRANSPORT.equalsIgnoreCase(configuration.getTransport()))
+ server = new HttpServer(responder, configuration.getPort());
+ else if(AVRO_NETTY_TRANSPORT.equalsIgnoreCase(configuration.getTransport()))
+ server = new NettyServer(responder, new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+ else throw new IllegalArgumentException("Unknown transport " + configuration.getTransport());
+
+ server.start();
+
+ return server;
+ }
+
+ /**
+ * Registers consumer by appropriate message name as key in registry.
+ *
+ * @param messageName message name
+ * @param consumer avro consumer
+ * @throws AvroComponentException
+ */
+ public void register(String messageName, AvroConsumer consumer) throws AvroComponentException {
+ if (messageName == null) {
+ if(this.defaultConsumer != null)
+ throw new AvroComponentException("Default consumer already registered for uri: " + consumer.getEndpoint().getEndpointUri());
+ this.defaultConsumer = consumer;
+ } else {
+ if (consumerRegistry.putIfAbsent(messageName, consumer) != null) {
+ throw new AvroComponentException("Consumer already registered for message: " + messageName + " and uri: " + consumer.getEndpoint().getEndpointUri());
+ }
+ }
+ }
+
+ /**
+ * Unregisters consumer by message name.
+ * Stops server in case if all consumers are unregistered and default consumer is absent or stopped.
+ *
+ * @param messageName message name
+ * @return true if all consumers are unregistered and defaultConsumer is absent or null.
+ * It means that this responder can be unregistered.
+ */
+ public boolean unregister(String messageName) {
+ if(!StringUtils.isEmpty(messageName)) {
+ if(consumerRegistry.remove(messageName) == null)
+ Log.warn("Consumer with message name " + messageName + " was already unregistered.");
+ }
+ else defaultConsumer = null;
+
+ if((defaultConsumer == null) && (consumerRegistry.isEmpty())) {
+ if (server != null) {
+ server.close();
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public Object respond(Protocol.Message message, Object request, SpecificData data) throws Exception {
+ AvroConsumer consumer = this.defaultConsumer;
+ if(this.consumerRegistry.containsKey(message.getName()))
+ consumer = this.consumerRegistry.get(message.getName());
+
+ if(consumer == null) throw new AvroComponentException("No consumer defined for message: " + message.getName());
+
+ Object params = extractParams(message, request, consumer.getEndpoint().getConfiguration().isSingleParameter(), data);
+
+ return processExchange(consumer, message, params);
+ }
+
+ /**
+ * Extracts parameters from RPC call to List or converts to object of appropriate type
+ * if only one parameter set.
+ *
+ * @param message Avro message
+ * @param request Avro request
+ * @param singleParameter Indicates that called method has single parameter
+ * @param dataResolver Extracts type of parameters in call
+ * @return Parameters of RPC method invocation
+ */
+ private static Object extractParams(Protocol.Message message, Object request, boolean singleParameter, SpecificData dataResolver) {
+
+ if(singleParameter) {
+ Schema.Field field = message.getRequest().getFields().get(0);
+ return dataResolver.getField(request, field.name(), field.pos());
+ } else {
+ int i = 0;
+ Object[] params = new Object[message.getRequest().getFields().size()];
+ for (Schema.Field param : message.getRequest().getFields()) {
+ params[i] = dataResolver.getField(request, param.name(), param.pos());
+ i++;
+ }
+ return params;
+ }
+ }
+
+ /**
+ * Creates exchange and processes it.
+ *
+ * @param consumer Holds processor and exception handler
+ * @param message Message on which exchange is created
+ * @param params Params of exchange
+ * @return Response of exchange processing
+ * @throws Exception
+ */
+ private static Object processExchange(AvroConsumer consumer, Protocol.Message message, Object params) throws Exception {
+ Object response;
+ Exchange exchange = consumer.getEndpoint().createExchange(message, params);
+
+ try {
+ consumer.getProcessor().process(exchange);
+ } catch (Throwable e) {
+ consumer.getExceptionHandler().handleException(e);
+ }
+
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ response = exchange.getOut().getBody();
+ } else {
+ response = null;
+ }
+
+ boolean failed = exchange.isFailed();
+ if (failed) {
+ if (exchange.getException() != null) {
+ throw exchange.getException();
+ } else {
+ // failed and no exception, must be a fault
+ throw new AvroComponentException("Camel processing error.");
+ }
+ }
+ return response;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyConsumer.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyConsumer.java
deleted file mode 100644
index 72cb30d..0000000
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyConsumer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.avro;
-
-import java.net.InetSocketAddress;
-
-import org.apache.avro.ipc.NettyServer;
-
-import org.apache.camel.Endpoint;
-import org.apache.camel.Processor;
-
-public class AvroNettyConsumer extends AvroConsumer {
-
- NettyServer server;
-
- public AvroNettyConsumer(Endpoint endpoint, Processor processor) {
- super(endpoint, processor);
- }
-
- @Override
- protected void doStart() throws Exception {
- AvroConfiguration configuration = getEndpoint().getConfiguration();
- server = new NettyServer(new AvroResponder(this), new InetSocketAddress(configuration.getHost(), configuration.getPort()));
- server.start();
- }
-
- @Override
- protected void doStop() throws Exception {
- super.doStop();
- if (server != null) {
- server.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java
index cb27955..261ef11 100644
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroNettyEndpoint.java
@@ -17,8 +17,6 @@
package org.apache.camel.component.avro;
import org.apache.camel.Component;
-import org.apache.camel.Consumer;
-import org.apache.camel.Processor;
import org.apache.camel.Producer;
public class AvroNettyEndpoint extends AvroEndpoint {
@@ -45,21 +43,4 @@ public class AvroNettyEndpoint extends AvroEndpoint {
public Producer createProducer() throws Exception {
return new AvroNettyProducer(this);
}
-
- /**
- * Creates a new <a
- * href="http://camel.apache.org/event-driven-consumer.html">Event
- * Driven Consumer</a> which consumes messages from the endpoint using the
- * given processor
- *
- * @param processor the given processor
- * @return a newly created consumer
- * @throws Exception can be thrown
- */
- @Override
- public Consumer createConsumer(Processor processor) throws Exception {
- AvroNettyConsumer answer = new AvroNettyConsumer(this, processor);
- configureConsumer(answer);
- return answer;
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java
index b7718bc..b3ba04c 100644
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroProducer.java
@@ -19,12 +19,12 @@ package org.apache.camel.component.avro;
import org.apache.avro.ipc.Callback;
import org.apache.avro.ipc.Requestor;
import org.apache.avro.ipc.Transceiver;
-
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ServicePoolAware;
import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.commons.lang.StringUtils;
public abstract class AvroProducer extends DefaultAsyncProducer implements ServicePoolAware {
@@ -41,8 +41,29 @@ public abstract class AvroProducer extends DefaultAsyncProducer implements Servi
public boolean process(final Exchange exchange, final AsyncCallback callback) {
Object request = exchange.getIn().getBody();
+ AvroConfiguration configuration = getEndpoint().getConfiguration();
+ if (transceiver == null) {
+ try {
+ transceiver = createTransceiver();
+ if(configuration.isReflectionProtocol())
+ requestor = new AvroReflectRequestor(configuration.getProtocol(), transceiver);
+ else
+ requestor = new AvroSpecificRequestor(configuration.getProtocol(), transceiver);
+ } catch (Exception e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
+ }
+ }
+
try {
- requestor.request(exchange.getIn().getHeader(AvroConstants.AVRO_MESSAGE_NAME, String.class), wrapObjectToArray(request), new Callback<Object>() {
+ String messageName;
+ if(!StringUtils.isEmpty(exchange.getIn().getHeader(AvroConstants.AVRO_MESSAGE_NAME, String.class)))
+ messageName = exchange.getIn().getHeader(AvroConstants.AVRO_MESSAGE_NAME, String.class);
+ else
+ messageName = configuration.getMessageName();
+
+ requestor.request(messageName, wrapObjectToArray(request), new Callback<Object>() {
@Override
public void handleResult(Object result) {
// got result from avro, so set it on the exchange and invoke the callback
@@ -88,8 +109,6 @@ public abstract class AvroProducer extends DefaultAsyncProducer implements Servi
@Override
protected void doStart() throws Exception {
super.doStart();
- transceiver = createTransceiver();
- requestor = new AvroRequestor(getEndpoint().getProtocol(), transceiver);
}
@Override
@@ -97,6 +116,7 @@ public abstract class AvroProducer extends DefaultAsyncProducer implements Servi
super.doStop();
if (transceiver != null) {
transceiver.close();
+ transceiver = null;
}
requestor = null;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectRequestor.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectRequestor.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectRequestor.java
new file mode 100644
index 0000000..1d6fc2a
--- /dev/null
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectRequestor.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.reflect.ReflectRequestor;
+
+public class AvroReflectRequestor extends ReflectRequestor {
+
+ public AvroReflectRequestor(Class<?> iface, Transceiver transceiver) throws IOException {
+ super(iface, transceiver);
+ }
+
+ public AvroReflectRequestor(Protocol protocol, Transceiver transceiver) throws IOException {
+ super(protocol, transceiver);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectResponder.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectResponder.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectResponder.java
new file mode 100644
index 0000000..c841021
--- /dev/null
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroReflectResponder.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.avro;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.ipc.reflect.ReflectResponder;
+import org.apache.avro.reflect.ReflectData;
+
+public class AvroReflectResponder extends ReflectResponder {
+ private AvroListener listener;
+
+
+ public AvroReflectResponder(Protocol protocol, AvroListener listener) throws Exception {
+ super(protocol, null);
+ this.listener = listener;
+ }
+
+ @Override
+ public Object respond(Protocol.Message message, Object request) throws Exception {
+ return listener.respond(message, request, ReflectData.get());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroRequestor.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroRequestor.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroRequestor.java
deleted file mode 100644
index dd54c20..0000000
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroRequestor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.Protocol;
-import org.apache.avro.ipc.Transceiver;
-import org.apache.avro.ipc.specific.SpecificRequestor;
-
-public class AvroRequestor extends SpecificRequestor {
-
- public AvroRequestor(Protocol protocol, Transceiver transceiver) throws IOException {
- super(protocol, transceiver);
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroResponder.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroResponder.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroResponder.java
deleted file mode 100644
index 4d0cd40..0000000
--- a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroResponder.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.avro;
-
-import org.apache.avro.Protocol;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.ipc.specific.SpecificResponder;
-import org.apache.avro.specific.SpecificData;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.util.ExchangeHelper;
-
-public class AvroResponder extends SpecificResponder {
-
- private AvroConsumer consumer;
-
- public AvroResponder(AvroConsumer consumer) {
- super(consumer.getEndpoint().getProtocol(), null);
- this.consumer = consumer;
- }
-
- @Override
- public Object respond(Protocol.Message message, Object request) throws Exception {
- Object response;
- int numParams = message.getRequest().getFields().size();
- Object[] params = new Object[numParams];
- Class<?>[] paramTypes = new Class[numParams];
- int i = 0;
- for (Schema.Field param : message.getRequest().getFields()) {
- params[i] = ((GenericRecord) request).get(param.name());
- paramTypes[i] = SpecificData.get().getClass(param.schema());
- i++;
- }
- Exchange exchange = consumer.getEndpoint().createExchange(message, params);
-
- try {
- consumer.getProcessor().process(exchange);
- } catch (Throwable e) {
- consumer.getExceptionHandler().handleException(e);
- }
-
- if (ExchangeHelper.isOutCapable(exchange)) {
- response = exchange.getOut().getBody();
- } else {
- response = null;
- }
-
- boolean failed = exchange.isFailed();
- if (failed) {
- if (exchange.getException() != null) {
- response = exchange.getException();
- } else {
- // failed and no exception, must be a fault
- response = exchange.getOut().getBody();
- }
- }
- return response;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificRequestor.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificRequestor.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificRequestor.java
new file mode 100644
index 0000000..1d6b5f6
--- /dev/null
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificRequestor.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+
+public class AvroSpecificRequestor extends SpecificRequestor {
+
+ public AvroSpecificRequestor(Protocol protocol, Transceiver transceiver) throws IOException {
+ super(protocol, transceiver);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificResponder.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificResponder.java b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificResponder.java
new file mode 100644
index 0000000..98a5d82
--- /dev/null
+++ b/components/camel-avro/src/main/java/org/apache/camel/component/avro/AvroSpecificResponder.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.avro;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.avro.specific.SpecificData;
+
+public class AvroSpecificResponder extends SpecificResponder {
+ private AvroListener listener;
+
+
+ public AvroSpecificResponder(Protocol protocol, AvroListener listener) throws Exception {
+ super(protocol, null);
+ this.listener = listener;
+ }
+
+ @Override
+ public Object respond(Protocol.Message message, Object request) throws Exception {
+ return listener.respond(message, request, SpecificData.get());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestPojo.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestPojo.java b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestPojo.java
new file mode 100644
index 0000000..d7290d8
--- /dev/null
+++ b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestPojo.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.avro.test;
+
+public class TestPojo {
+
+ private String pojoName;
+
+ public String getPojoName() {
+ return pojoName;
+ }
+
+ public void setPojoName(String pojoName) {
+ this.pojoName = pojoName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflection.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflection.java b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflection.java
new file mode 100644
index 0000000..07eb08b
--- /dev/null
+++ b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflection.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.avro.test;
+
+public interface TestReflection {
+
+ public String getName();
+
+ public void setName(String name);
+
+ public int getAge();
+
+ public void setAge(int age);
+
+ public int increaseAge(int age);
+
+ public void setTestPojo(TestPojo testPojo);
+
+ public TestPojo getTestPojo();
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflectionImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflectionImpl.java b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflectionImpl.java
new file mode 100644
index 0000000..98b9eea
--- /dev/null
+++ b/components/camel-avro/src/test/java/org/apache/camel/avro/test/TestReflectionImpl.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.avro.test;
+
+public class TestReflectionImpl implements TestReflection {
+
+ String name = "";
+ int age = 0;
+ TestPojo testPojo;
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public int getAge() {
+ return this.age;
+ }
+
+ @Override
+ public void setAge(int age) {
+ this.age = age;
+ }
+
+ @Override
+ public int increaseAge(int age) {
+ this.age = ++age;
+ return this.age;
+ }
+
+ @Override
+ public void setTestPojo(TestPojo testPojo) {
+ this.testPojo = testPojo;
+ }
+
+ @Override
+ public TestPojo getTestPojo() {
+ return testPojo;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroConsumerTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroConsumerTestSupport.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroConsumerTestSupport.java
index 094da2e..4ee9cd9 100644
--- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroConsumerTestSupport.java
+++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroConsumerTestSupport.java
@@ -19,23 +19,42 @@ package org.apache.camel.component.avro;
import java.io.IOException;
-import org.apache.avro.Protocol;
+
+import org.apache.avro.AvroRuntimeException;
import org.apache.avro.ipc.Requestor;
import org.apache.avro.ipc.Transceiver;
import org.apache.camel.CamelContext;
import org.apache.camel.avro.generated.Key;
-import org.apache.camel.avro.generated.KeyValueProtocol;
import org.apache.camel.avro.generated.Value;
import org.apache.camel.avro.impl.KeyValueProtocolImpl;
-
+import org.apache.camel.avro.test.TestPojo;
+import org.apache.camel.avro.test.TestReflection;
+import org.apache.camel.avro.test.TestReflectionImpl;
import org.junit.After;
import org.junit.Test;
public abstract class AvroConsumerTestSupport extends AvroTestSupport {
+ protected int avroPortMessageInRoute = setupFreePort("avroPortMessageInRoute");
+ protected int avroPortForWrongMessages = setupFreePort("avroPortForWrongMessages");
+
Transceiver transceiver;
Requestor requestor;
+
+ Transceiver transceiverMessageInRoute;
+ Requestor requestorMessageInRoute;
+
+ Transceiver transceiverForWrongMessages;
+ Requestor requestorForWrongMessages;
+
+ Transceiver reflectTransceiver;
+ Requestor reflectRequestor;
+
KeyValueProtocolImpl keyValue = new KeyValueProtocolImpl();
+ TestReflection testReflection = new TestReflectionImpl();
+
+ public static final String REFLECTION_TEST_NAME = "Chucky";
+ public static final int REFLECTION_TEST_AGE = 100;
protected abstract void initializeTranceiver() throws IOException;
@@ -47,6 +66,18 @@ public abstract class AvroConsumerTestSupport extends AvroTestSupport {
if (transceiver != null) {
transceiver.close();
}
+
+ if (transceiverMessageInRoute != null) {
+ transceiverMessageInRoute.close();
+ }
+
+ if (transceiverForWrongMessages != null) {
+ transceiverForWrongMessages.close();
+ }
+
+ if (reflectTransceiver != null) {
+ reflectTransceiver.close();
+ }
}
@Test
@@ -59,6 +90,59 @@ public abstract class AvroConsumerTestSupport extends AvroTestSupport {
}
@Test
+ public void testInOnlyMessageInRoute() throws Exception {
+ initializeTranceiver();
+ Key key = Key.newBuilder().setKey("1").build();
+ Value value = Value.newBuilder().setValue("test value").build();
+ Object[] request = {key, value};
+ requestorMessageInRoute.request("put", request);
+ }
+
+ @Test
+ public void testInOnlyReflectRequestor() throws Exception {
+ initializeTranceiver();
+ Object[] request = {REFLECTION_TEST_NAME};
+ reflectRequestor.request("setName", request);
+ assertEquals(REFLECTION_TEST_NAME, testReflection.getName());
+ }
+
+ @Test(expected=AvroRuntimeException.class)
+ public void testInOnlyWrongMessageName() throws Exception {
+ initializeTranceiver();
+ Key key = Key.newBuilder().setKey("1").build();
+ Value value = Value.newBuilder().setValue("test value").build();
+ Object[] request = {key, value};
+ requestorMessageInRoute.request("throwException", request);
+ }
+
+ @Test(expected=AvroRuntimeException.class)
+ public void testInOnlyToNotExistingRoute() throws Exception {
+ initializeTranceiver();
+ Key key = Key.newBuilder().setKey("1").build();
+ Value value = Value.newBuilder().setValue("test value").build();
+ Object[] request = {key, value};
+ requestorForWrongMessages.request("get", request);
+ }
+
+ @Test
+ public void testInOnlyReflectSingleParameterNotSet() throws Exception {
+ initializeTranceiver();
+ Object[] request = {100};
+ reflectRequestor.request("setAge", request);
+ assertEquals(0, testReflection.getAge());
+ }
+
+ @Test
+ public void testInOnlyReflectionPojoTest() throws Exception {
+ initializeTranceiver();
+ TestPojo testPojo = new TestPojo();
+ testPojo.setPojoName("pojo1");
+ Object[] request = {testPojo};
+ reflectRequestor.request("setTestPojo", request);
+ assertEquals(testPojo.getPojoName(), testReflection.getTestPojo().getPojoName());
+ }
+
+ @Test
public void testInOut() throws Exception {
initializeTranceiver();
keyValue.getStore().clear();
@@ -70,15 +154,35 @@ public abstract class AvroConsumerTestSupport extends AvroTestSupport {
assertEquals(value, response);
}
- @Override
- protected CamelContext createCamelContext() throws Exception {
- CamelContext context = super.createCamelContext();
- Protocol protocol = KeyValueProtocol.PROTOCOL;
- AvroConfiguration configuration = new AvroConfiguration();
- configuration.setProtocol(protocol);
- AvroComponent component = new AvroComponent(context);
- component.setConfiguration(configuration);
- context.addComponent("avro", component);
- return context;
+ @Test
+ public void testInOutMessageInRoute() throws Exception {
+ initializeTranceiver();
+ keyValue.getStore().clear();
+ Key key = Key.newBuilder().setKey("2").build();
+ Value value = Value.newBuilder().setValue("test value").build();
+ keyValue.getStore().put(key, value);
+ Object[] request = {key};
+ Object response = requestorMessageInRoute.request("get", request);
+ assertEquals(value, response);
+ }
+
+ @Test
+ public void testInOutReflectRequestor() throws Exception {
+ initializeTranceiver();
+ Object[] request = {REFLECTION_TEST_AGE};
+ Object response = reflectRequestor.request("increaseAge", request);
+ assertEquals(testReflection.getAge(), response);
+ }
+
+ @Test
+ public void testInOutReflectionPojoTest() throws Exception {
+ initializeTranceiver();
+ TestPojo testPojo = new TestPojo();
+ testPojo.setPojoName("pojo2");
+ Object[] request = {testPojo};
+ reflectRequestor.request("setTestPojo", request);
+ request = new Object[0];
+ Object response = reflectRequestor.request("getTestPojo", request);
+ assertEquals(testPojo.getPojoName(), ((TestPojo) response).getPojoName());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpConsumerTest.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpConsumerTest.java
index 148055d..8bec47d 100644
--- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpConsumerTest.java
+++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpConsumerTest.java
@@ -21,11 +21,15 @@ import java.io.IOException;
import java.net.URL;
import org.apache.avro.ipc.HttpTransceiver;
+import org.apache.avro.ipc.reflect.ReflectRequestor;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.camel.avro.generated.KeyValueProtocol;
+import org.apache.camel.avro.test.TestReflection;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.avro.processors.GetProcessor;
import org.apache.camel.component.avro.processors.PutProcessor;
+import org.apache.camel.component.avro.processors.ReflectionInOnlyProcessor;
+import org.apache.camel.component.avro.processors.ReflectionInOutProcessor;
public class AvroHttpConsumerTest extends AvroConsumerTestSupport {
@@ -33,16 +37,51 @@ public class AvroHttpConsumerTest extends AvroConsumerTestSupport {
protected void initializeTranceiver() throws IOException {
transceiver = new HttpTransceiver(new URL("http://localhost:" + avroPort));
requestor = new SpecificRequestor(KeyValueProtocol.class, transceiver);
+
+ transceiverMessageInRoute = new HttpTransceiver(new URL("http://localhost:" + avroPortMessageInRoute));
+ requestorMessageInRoute = new SpecificRequestor(KeyValueProtocol.class, transceiverMessageInRoute);
+
+ transceiverForWrongMessages = new HttpTransceiver(new URL("http://localhost:" + avroPortForWrongMessages));
+ requestorForWrongMessages = new SpecificRequestor(KeyValueProtocol.class, transceiverForWrongMessages);
+
+ reflectTransceiver = new HttpTransceiver(new URL("http://localhost:" + avroPortReflection));
+ reflectRequestor = new ReflectRequestor(TestReflection.class, reflectTransceiver);
}
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
+ errorHandler(deadLetterChannel("mock:exception-handler"));
+
//In Only
- from("avro:http:localhost:" + avroPort).choice()
- .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'put'}").process(new PutProcessor(keyValue))
- .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'get'}").process(new GetProcessor(keyValue));
+ from("avro:http:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol").choice()
+ .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'put'}").process(new PutProcessor(keyValue))
+ .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'get'}").process(new GetProcessor(keyValue));
+
+ from("avro:http:localhost:" + avroPortMessageInRoute + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+ .process(new PutProcessor(keyValue));
+
+ from("avro:http:localhost:" + avroPortMessageInRoute + "/get?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+ .process(new GetProcessor(keyValue));
+
+ from("avro:http:localhost:" + avroPortForWrongMessages + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+ .process(new PutProcessor(keyValue));
+
+ from("avro:http:localhost:" + avroPortReflection + "/setName?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true")
+ .process(new ReflectionInOnlyProcessor(testReflection));
+
+ from("avro:http:localhost:" + avroPortReflection + "/setAge?protocolClassName=org.apache.camel.avro.test.TestReflection")
+ .process(new ReflectionInOnlyProcessor(testReflection));
+
+ from("avro:http:localhost:" + avroPortReflection + "/setTestPojo?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true")
+ .process(new ReflectionInOnlyProcessor(testReflection));
+
+ from("avro:http:localhost:" + avroPortReflection + "/increaseAge?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true")
+ .process(new ReflectionInOutProcessor(testReflection));
+
+ from("avro:http:localhost:" + avroPortReflection + "/getTestPojo?protocolClassName=org.apache.camel.avro.test.TestReflection")
+ .process(new ReflectionInOutProcessor(testReflection));
}
};
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpProducerTest.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpProducerTest.java
index 237839a..177eae2 100644
--- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpProducerTest.java
+++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroHttpProducerTest.java
@@ -20,8 +20,10 @@ package org.apache.camel.component.avro;
import java.io.IOException;
import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.reflect.ReflectResponder;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.camel.avro.generated.KeyValueProtocol;
+import org.apache.camel.avro.test.TestReflection;
import org.apache.camel.builder.RouteBuilder;
public class AvroHttpProducerTest extends AvroProducerTestSupport {
@@ -32,6 +34,11 @@ public class AvroHttpProducerTest extends AvroProducerTestSupport {
server = new HttpServer(new SpecificResponder(KeyValueProtocol.PROTOCOL, keyValue), avroPort);
server.start();
}
+
+ if (serverReflection == null) {
+ serverReflection = new HttpServer(new ReflectResponder(TestReflection.class, testReflection), avroPortReflection);
+ serverReflection.start();
+ }
}
protected RouteBuilder createRouteBuilder() throws Exception {
@@ -39,10 +46,33 @@ public class AvroHttpProducerTest extends AvroProducerTestSupport {
@Override
public void configure() throws Exception {
//In Only
- from("direct:in").to("avro:http:localhost:" + avroPort);
+ from("direct:in")
+ .to("avro:http:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol");
+
+ //In Only with message in route
+ from("direct:in-message-name")
+ .errorHandler(deadLetterChannel("mock:in-message-name-error"))
+ .to("avro:http:localhost:" + avroPort + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+ .to("mock:result-in-message-name");
+
+ //In Only with existing interface
+ from("direct:in-reflection")
+ .to("avro:http:localhost:" + avroPortReflection + "/setName?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true");
//InOut
- from("direct:inout").to("avro:http:localhost:" + avroPort).to("mock:result-inout");
+ from("direct:inout")
+ .to("avro:http:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+ .to("mock:result-inout");
+
+ //InOut with message in route
+ from("direct:inout-message-name")
+ .to("avro:http:localhost:" + avroPort + "/get?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+ .to("mock:result-inout-message-name");
+
+ //InOut with existing interface
+ from("direct:inout-reflection")
+ .to("avro:http:localhost:" + avroPortReflection + "/increaseAge?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true")
+ .to("mock:result-inout-reflection");
}
};
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyConsumerTest.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyConsumerTest.java
index b427217..2156241 100644
--- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyConsumerTest.java
+++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyConsumerTest.java
@@ -21,11 +21,15 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.reflect.ReflectRequestor;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.camel.avro.generated.KeyValueProtocol;
+import org.apache.camel.avro.test.TestReflection;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.avro.processors.GetProcessor;
import org.apache.camel.component.avro.processors.PutProcessor;
+import org.apache.camel.component.avro.processors.ReflectionInOnlyProcessor;
+import org.apache.camel.component.avro.processors.ReflectionInOutProcessor;
public class AvroNettyConsumerTest extends AvroConsumerTestSupport {
@@ -33,6 +37,15 @@ public class AvroNettyConsumerTest extends AvroConsumerTestSupport {
protected void initializeTranceiver() throws IOException {
transceiver = new NettyTransceiver(new InetSocketAddress("localhost", avroPort));
requestor = new SpecificRequestor(KeyValueProtocol.class, transceiver);
+
+ transceiverMessageInRoute = new NettyTransceiver(new InetSocketAddress("localhost", avroPortMessageInRoute));
+ requestorMessageInRoute = new SpecificRequestor(KeyValueProtocol.class, transceiverMessageInRoute);
+
+ transceiverForWrongMessages = new NettyTransceiver(new InetSocketAddress("localhost", avroPortForWrongMessages));
+ requestorForWrongMessages = new SpecificRequestor(KeyValueProtocol.class, transceiverForWrongMessages);
+
+ reflectTransceiver = new NettyTransceiver(new InetSocketAddress("localhost", avroPortReflection));
+ reflectRequestor = new ReflectRequestor(TestReflection.class, reflectTransceiver);
}
protected RouteBuilder createRouteBuilder() throws Exception {
@@ -40,9 +53,33 @@ public class AvroNettyConsumerTest extends AvroConsumerTestSupport {
@Override
public void configure() throws Exception {
//In Only
- from("avro:netty:localhost:" + avroPort).choice()
- .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'put'}").process(new PutProcessor(keyValue))
- .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'get'}").process(new GetProcessor(keyValue));
+ from("avro:netty:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol").choice()
+ .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'put'}").process(new PutProcessor(keyValue))
+ .when().el("${in.headers." + AvroConstants.AVRO_MESSAGE_NAME + " == 'get'}").process(new GetProcessor(keyValue));
+
+ from("avro:netty:localhost:" + avroPortMessageInRoute + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+ .process(new PutProcessor(keyValue));
+
+ from("avro:netty:localhost:" + avroPortMessageInRoute + "/get?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+ .process(new GetProcessor(keyValue));
+
+ from("avro:netty:localhost:" + avroPortForWrongMessages + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+ .process(new PutProcessor(keyValue));
+
+ from("avro:netty:localhost:" + avroPortReflection + "/setName?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true")
+ .process(new ReflectionInOnlyProcessor(testReflection));
+
+ from("avro:netty:localhost:" + avroPortReflection + "/setAge?protocolClassName=org.apache.camel.avro.test.TestReflection")
+ .process(new ReflectionInOnlyProcessor(testReflection));
+
+ from("avro:http:localhost:" + avroPortReflection + "/setTestPojo?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true")
+ .process(new ReflectionInOnlyProcessor(testReflection));
+
+ from("avro:http:localhost:" + avroPortReflection + "/increaseAge?protocolClassName=org.apache.camel.avro.test.TestReflection&singleParameter=true")
+ .process(new ReflectionInOutProcessor(testReflection));
+
+ from("avro:netty:localhost:" + avroPortReflection + "/getTestPojo?protocolClassName=org.apache.camel.avro.test.TestReflection")
+ .process(new ReflectionInOutProcessor(testReflection));
}
};
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyProducerTest.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyProducerTest.java
index 4b890af..5816e39 100644
--- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyProducerTest.java
+++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettyProducerTest.java
@@ -20,30 +20,60 @@ package org.apache.camel.component.avro;
import java.net.InetSocketAddress;
import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.reflect.ReflectResponder;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.camel.avro.generated.KeyValueProtocol;
+import org.apache.camel.avro.test.TestReflection;
import org.apache.camel.builder.RouteBuilder;
public class AvroNettyProducerTest extends AvroProducerTestSupport {
+ @Override
+ protected void initializeServer() {
+ if (server == null) {
+ server = new NettyServer(new SpecificResponder(KeyValueProtocol.PROTOCOL, keyValue), new InetSocketAddress("localhost", avroPort));
+ server.start();
+ }
+
+ if (serverReflection == null) {
+ serverReflection = new NettyServer(new ReflectResponder(TestReflection.class, testReflection), new InetSocketAddress("localhost", avroPortReflection));
+ serverReflection.start();
+ }
+ }
+
public RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
//In Only
- from("direct:in").to("avro:netty:localhost:" + avroPort);
+ from("direct:in")
+ .to("avro:netty:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol");
+
+ //In Only with message in route
+ from("direct:in-message-name")
+ .errorHandler(deadLetterChannel("mock:in-message-name-error"))
+ .to("avro:netty:localhost:" + avroPort + "/put?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+ .to("mock:result-in-message-name");
+
+ //In Only with existing interface
+ from("direct:in-reflection")
+ .to("avro:netty:localhost:" + avroPortReflection + "/setName?protocolClassName=org.apache.camel.avro.test.TestReflection");
//InOut
- from("direct:inout").to("avro:netty:localhost:" + avroPort).to("mock:result-inout");
+ from("direct:inout")
+ .to("avro:netty:localhost:" + avroPort + "?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+ .to("mock:result-inout");
+
+ //InOut
+ from("direct:inout-message-name")
+ .to("avro:netty:localhost:" + avroPort + "/get?protocolClassName=org.apache.camel.avro.generated.KeyValueProtocol")
+ .to("mock:result-inout-message-name");
+
+ //InOut with existing interface
+ from("direct:inout-reflection")
+ .to("avro:netty:localhost:" + avroPortReflection + "/increaseAge?protocolClassName=org.apache.camel.avro.test.TestReflection")
+ .to("mock:result-inout-reflection");
}
};
}
-
- @Override
- protected void initializeServer() {
- if (server == null) {
- server = new NettyServer(new SpecificResponder(KeyValueProtocol.PROTOCOL, keyValue), new InetSocketAddress("localhost", avroPort));
- server.start();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettySpringConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettySpringConsumerTest.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettySpringConsumerTest.java
index 3121934..a0fe869 100644
--- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettySpringConsumerTest.java
+++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroNettySpringConsumerTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.avro;
import org.apache.camel.CamelContext;
import org.apache.camel.avro.impl.KeyValueProtocolImpl;
+import org.apache.camel.avro.test.TestReflectionImpl;
import org.apache.camel.spring.SpringCamelContext;
import org.junit.After;
@@ -37,6 +38,7 @@ public class AvroNettySpringConsumerTest extends AvroNettyConsumerTest {
super.setUp();
keyValue = (KeyValueProtocolImpl) applicationContext.getBean("keyValue");
+ testReflection = (TestReflectionImpl) applicationContext.getBean("testReflection");
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/6783ceab/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroProducerTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroProducerTestSupport.java b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroProducerTestSupport.java
index c029a56..39c215d 100644
--- a/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroProducerTestSupport.java
+++ b/components/camel-avro/src/test/java/org/apache/camel/component/avro/AvroProducerTestSupport.java
@@ -19,13 +19,12 @@ package org.apache.camel.component.avro;
import java.io.IOException;
-import org.apache.avro.Protocol;
import org.apache.avro.ipc.Server;
import org.apache.camel.CamelContext;
import org.apache.camel.avro.generated.Key;
-import org.apache.camel.avro.generated.KeyValueProtocol;
import org.apache.camel.avro.generated.Value;
import org.apache.camel.avro.impl.KeyValueProtocolImpl;
+import org.apache.camel.avro.test.TestReflectionImpl;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.After;
import org.junit.Test;
@@ -33,7 +32,9 @@ import org.junit.Test;
public abstract class AvroProducerTestSupport extends AvroTestSupport {
Server server;
+ Server serverReflection;
KeyValueProtocolImpl keyValue = new KeyValueProtocolImpl();
+ TestReflectionImpl testReflection = new TestReflectionImpl();
protected abstract void initializeServer() throws IOException;
@@ -52,6 +53,10 @@ public abstract class AvroProducerTestSupport extends AvroTestSupport {
if (server != null) {
server.close();
}
+
+ if (serverReflection != null) {
+ serverReflection.close();
+ }
}
@Test
@@ -64,6 +69,40 @@ public abstract class AvroProducerTestSupport extends AvroTestSupport {
}
@Test
+ public void testInOnlyWithMessageNameInRoute() throws InterruptedException {
+ MockEndpoint mock = getMockEndpoint("mock:result-in-message-name");
+ mock.expectedMessageCount(1);
+ Key key = Key.newBuilder().setKey("1").build();
+ Value value = Value.newBuilder().setValue("test value").build();
+ Object[] request = {key, value};
+ template.sendBody("direct:in-message-name", request);
+ assertEquals(value, keyValue.getStore().get(key));
+ mock.assertIsSatisfied(5000);
+ }
+
+ @Test
+ public void testInOnlyReflection() throws InterruptedException {
+ String name = "Chuck";
+ Object[] request = {name};
+ template.sendBody("direct:in-reflection", request);
+ assertEquals(name, testReflection.getName());
+ }
+
+ @Test
+ public void testInOnlyWithWrongMessageNameInMessage() throws InterruptedException {
+ MockEndpoint mockInMessageEnd = getMockEndpoint("mock:result-in-message-name");
+ mockInMessageEnd.expectedMessageCount(0);
+ MockEndpoint mockErrorChannel = getMockEndpoint("mock:in-message-name-error");
+ mockErrorChannel.expectedMessageCount(1);
+ Key key = Key.newBuilder().setKey("1").build();
+ Value value = Value.newBuilder().setValue("test value").build();
+ Object[] request = {key, value};
+ template.sendBodyAndHeader("direct:in-message-name", request, AvroConstants.AVRO_MESSAGE_NAME, "/get");
+ mockErrorChannel.assertIsSatisfied(5000);
+ mockInMessageEnd.assertIsSatisfied();
+ }
+
+ @Test
public void testInOut() throws InterruptedException {
keyValue.getStore().clear();
Key key = Key.newBuilder().setKey("2").build();
@@ -74,18 +113,33 @@ public abstract class AvroProducerTestSupport extends AvroTestSupport {
mock.expectedMessageCount(1);
mock.expectedBodiesReceived(value);
template.sendBodyAndHeader("direct:inout", key, AvroConstants.AVRO_MESSAGE_NAME, "get");
- mock.assertIsSatisfied(10000);
+ mock.assertIsSatisfied(5000);
}
- @Override
- protected CamelContext createCamelContext() throws Exception {
- CamelContext context = super.createCamelContext();
- Protocol protocol = KeyValueProtocol.PROTOCOL;
- AvroConfiguration configuration = new AvroConfiguration();
- configuration.setProtocol(protocol);
- AvroComponent component = new AvroComponent(context);
- component.setConfiguration(configuration);
- context.addComponent("avro", component);
- return context;
+ @Test
+ public void testInOutMessageNameInRoute() throws InterruptedException {
+ keyValue.getStore().clear();
+ Key key = Key.newBuilder().setKey("2").build();
+ Value value = Value.newBuilder().setValue("test value").build();
+ keyValue.getStore().put(key, value);
+
+ MockEndpoint mock = getMockEndpoint("mock:result-inout-message-name");
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived(value);
+ template.sendBody("direct:inout-message-name", key);
+ mock.assertIsSatisfied(5000);
+ }
+
+ @Test
+ public void testInOutReflection() throws InterruptedException {
+ int age = 100;
+ Object[] request = {age};
+
+ MockEndpoint mock = getMockEndpoint("mock:result-inout-reflection");
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived(++age);
+ template.sendBody("direct:inout-reflection", request);
+ mock.assertIsSatisfied(5000);
}
+
}
\ No newline at end of file