You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/27 06:41:58 UTC

[camel] branch master updated: CAMEL-15751: Base improvement to provide access to stream observer in the camel exchange

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f2fbcb  CAMEL-15751: Base improvement to provide access to stream observer in the camel exchange
6f2fbcb is described below

commit 6f2fbcb00eb43c2a48ee89d30a8ad26a35ce5aaf
Author: Rajasekhar <ra...@live.com>
AuthorDate: Sat Oct 24 01:33:34 2020 +0530

    CAMEL-15751: Base improvement to provide access to stream observer in the camel exchange
---
 .../org/apache/camel/catalog/components/grpc.json  |   1 +
 .../apache/camel/catalog/docs/grpc-component.adoc  |   3 +-
 .../component/grpc/GrpcEndpointConfigurer.java     |   5 +
 .../component/grpc/GrpcEndpointUriFactory.java     |   3 +-
 .../org/apache/camel/component/grpc/grpc.json      |   1 +
 .../camel-grpc/src/main/docs/grpc-component.adoc   |   3 +-
 .../camel/component/grpc/GrpcConfiguration.java    |  69 +++---
 .../apache/camel/component/grpc/GrpcConstants.java |   1 +
 .../apache/camel/component/grpc/GrpcConsumer.java  |   6 +
 .../component/grpc/server/GrpcMethodHandler.java   |  38 ++-
 .../grpc/RouteControlledStreamObserverTest.java    | 258 +++++++++++++++++++++
 .../endpoint/dsl/GrpcEndpointBuilderFactory.java   |  34 +++
 .../modules/ROOT/pages/grpc-component.adoc         |   3 +-
 13 files changed, 388 insertions(+), 37 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/grpc.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/grpc.json
index 8171d38..b789938 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/grpc.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/grpc.json
@@ -37,6 +37,7 @@
     "forwardOnCompleted": { "kind": "parameter", "displayName": "Forward On Completed", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "Determines if onCompleted events should be pushed to the Camel route." },
     "forwardOnError": { "kind": "parameter", "displayName": "Forward On Error", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "Determines if onError events should be pushed to the Camel route. Exceptions will be set as message body." },
     "maxConcurrentCallsPerConnection": { "kind": "parameter", "displayName": "Max Concurrent Calls Per Connection", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": 2147483647, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "The maximum number of concurrent calls permitted for each incoming server connection" },
+    "routeControlledStreamObserver": { "kind": "parameter", "displayName": "Route Controlled Stream Observer", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "Lets the route to take control over stream observer. If this value is set to true, then [...]
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with [...]
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
     "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the  [...]
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/grpc-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/grpc-component.adoc
index 7342e50..d90267b 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/grpc-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/grpc-component.adoc
@@ -83,7 +83,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (27 parameters):
+=== Query Parameters (28 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -96,6 +96,7 @@ with the following path and query parameters:
 | *forwardOnCompleted* (consumer) | Determines if onCompleted events should be pushed to the Camel route. | false | boolean
 | *forwardOnError* (consumer) | Determines if onError events should be pushed to the Camel route. Exceptions will be set as message body. | false | boolean
 | *maxConcurrentCallsPer{zwsp}Connection* (consumer) | The maximum number of concurrent calls permitted for each incoming server connection | 2147483647 | int
+| *routeControlledStreamObserver* (consumer) | Lets the route to take control over stream observer. If this value is set to true, then the response observer of gRPC call will be set with the name GrpcConstants.GRPC_RESPONSE_OBSERVER in the Exchange object. Please note that the stream observer's onNext(), onError(), onCompleted() methods should be called in the route. | false | boolean
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut |  | ExchangePattern
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
diff --git a/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointConfigurer.java b/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointConfigurer.java
index 5db521e..8213c9d 100644
--- a/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointConfigurer.java
+++ b/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointConfigurer.java
@@ -28,6 +28,7 @@ public class GrpcEndpointConfigurer extends PropertyConfigurerSupport implements
         map.put("forwardOnCompleted", boolean.class);
         map.put("forwardOnError", boolean.class);
         map.put("maxConcurrentCallsPerConnection", int.class);
+        map.put("routeControlledStreamObserver", boolean.class);
         map.put("exceptionHandler", org.apache.camel.spi.ExceptionHandler.class);
         map.put("exchangePattern", org.apache.camel.ExchangePattern.class);
         map.put("lazyStartProducer", boolean.class);
@@ -98,6 +99,8 @@ public class GrpcEndpointConfigurer extends PropertyConfigurerSupport implements
         case "negotiationType": target.getConfiguration().setNegotiationType(property(camelContext, io.grpc.netty.NegotiationType.class, value)); return true;
         case "producerstrategy":
         case "producerStrategy": target.getConfiguration().setProducerStrategy(property(camelContext, org.apache.camel.component.grpc.GrpcProducerStrategy.class, value)); return true;
+        case "routecontrolledstreamobserver":
+        case "routeControlledStreamObserver": target.getConfiguration().setRouteControlledStreamObserver(property(camelContext, boolean.class, value)); return true;
         case "serviceaccountresource":
         case "serviceAccountResource": target.getConfiguration().setServiceAccountResource(property(camelContext, java.lang.String.class, value)); return true;
         case "streamrepliesto":
@@ -163,6 +166,8 @@ public class GrpcEndpointConfigurer extends PropertyConfigurerSupport implements
         case "negotiationType": return target.getConfiguration().getNegotiationType();
         case "producerstrategy":
         case "producerStrategy": return target.getConfiguration().getProducerStrategy();
+        case "routecontrolledstreamobserver":
+        case "routeControlledStreamObserver": return target.getConfiguration().isRouteControlledStreamObserver();
         case "serviceaccountresource":
         case "serviceAccountResource": return target.getConfiguration().getServiceAccountResource();
         case "streamrepliesto":
diff --git a/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointUriFactory.java b/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointUriFactory.java
index cf340cb..174d9af 100644
--- a/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointUriFactory.java
+++ b/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointUriFactory.java
@@ -20,7 +20,7 @@ public class GrpcEndpointUriFactory extends org.apache.camel.support.component.E
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(30);
+        Set<String> props = new HashSet<>(31);
         props.add("basicPropertyBinding");
         props.add("serviceAccountResource");
         props.add("synchronous");
@@ -29,6 +29,7 @@ public class GrpcEndpointUriFactory extends org.apache.camel.support.component.E
         props.add("forwardOnCompleted");
         props.add("jwtIssuer");
         props.add("bridgeErrorHandler");
+        props.add("routeControlledStreamObserver");
         props.add("keyPassword");
         props.add("host");
         props.add("maxMessageSize");
diff --git a/components/camel-grpc/src/generated/resources/org/apache/camel/component/grpc/grpc.json b/components/camel-grpc/src/generated/resources/org/apache/camel/component/grpc/grpc.json
index 8171d38..b789938 100644
--- a/components/camel-grpc/src/generated/resources/org/apache/camel/component/grpc/grpc.json
+++ b/components/camel-grpc/src/generated/resources/org/apache/camel/component/grpc/grpc.json
@@ -37,6 +37,7 @@
     "forwardOnCompleted": { "kind": "parameter", "displayName": "Forward On Completed", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "Determines if onCompleted events should be pushed to the Camel route." },
     "forwardOnError": { "kind": "parameter", "displayName": "Forward On Error", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "Determines if onError events should be pushed to the Camel route. Exceptions will be set as message body." },
     "maxConcurrentCallsPerConnection": { "kind": "parameter", "displayName": "Max Concurrent Calls Per Connection", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "secret": false, "defaultValue": 2147483647, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "The maximum number of concurrent calls permitted for each incoming server connection" },
+    "routeControlledStreamObserver": { "kind": "parameter", "displayName": "Route Controlled Stream Observer", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration", "configurationField": "configuration", "description": "Lets the route to take control over stream observer. If this value is set to true, then [...]
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with [...]
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
     "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the  [...]
diff --git a/components/camel-grpc/src/main/docs/grpc-component.adoc b/components/camel-grpc/src/main/docs/grpc-component.adoc
index 7342e50..d90267b 100644
--- a/components/camel-grpc/src/main/docs/grpc-component.adoc
+++ b/components/camel-grpc/src/main/docs/grpc-component.adoc
@@ -83,7 +83,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (27 parameters):
+=== Query Parameters (28 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -96,6 +96,7 @@ with the following path and query parameters:
 | *forwardOnCompleted* (consumer) | Determines if onCompleted events should be pushed to the Camel route. | false | boolean
 | *forwardOnError* (consumer) | Determines if onError events should be pushed to the Camel route. Exceptions will be set as message body. | false | boolean
 | *maxConcurrentCallsPer{zwsp}Connection* (consumer) | The maximum number of concurrent calls permitted for each incoming server connection | 2147483647 | int
+| *routeControlledStreamObserver* (consumer) | Lets the route to take control over stream observer. If this value is set to true, then the response observer of gRPC call will be set with the name GrpcConstants.GRPC_RESPONSE_OBSERVER in the Exchange object. Please note that the stream observer's onNext(), onError(), onCompleted() methods should be called in the route. | false | boolean
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut |  | ExchangePattern
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
index 86a5b25..a8c4e9a 100644
--- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
+++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
@@ -105,6 +105,9 @@ public class GrpcConfiguration {
     @UriParam(label = "consumer", defaultValue = "" + Integer.MAX_VALUE)
     private int maxConcurrentCallsPerConnection = Integer.MAX_VALUE;
 
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean routeControlledStreamObserver;
+
     /**
      * Fully qualified service name from the protocol buffer descriptor file (package dot service definition name)
      */
@@ -150,6 +153,10 @@ public class GrpcConfiguration {
         this.port = port;
     }
 
+    public NegotiationType getNegotiationType() {
+        return negotiationType;
+    }
+
     /**
      * Identifies the security negotiation type used for HTTP/2 communication
      */
@@ -157,10 +164,6 @@ public class GrpcConfiguration {
         this.negotiationType = negotiationType;
     }
 
-    public NegotiationType getNegotiationType() {
-        return negotiationType;
-    }
-
     /**
      * Authentication method type in advance to the SSL/TLS negotiation
      */
@@ -227,6 +230,10 @@ public class GrpcConfiguration {
         this.serviceAccountResource = serviceAccountResource;
     }
 
+    public String getKeyCertChainResource() {
+        return keyCertChainResource;
+    }
+
     /**
      * The X.509 certificate chain file resource in PEM format link
      */
@@ -234,8 +241,8 @@ public class GrpcConfiguration {
         this.keyCertChainResource = keyCertChainResource;
     }
 
-    public String getKeyCertChainResource() {
-        return keyCertChainResource;
+    public String getKeyResource() {
+        return keyResource;
     }
 
     /**
@@ -245,10 +252,6 @@ public class GrpcConfiguration {
         this.keyResource = keyResource;
     }
 
-    public String getKeyResource() {
-        return keyResource;
-    }
-
     /**
      * The PKCS#8 private key file password
      */
@@ -260,6 +263,10 @@ public class GrpcConfiguration {
         this.keyPassword = keyPassword;
     }
 
+    public String getTrustCertCollectionResource() {
+        return trustCertCollectionResource;
+    }
+
     /**
      * The trusted certificates collection file resource in PEM format for verifying the remote endpoint's certificate
      */
@@ -267,10 +274,6 @@ public class GrpcConfiguration {
         this.trustCertCollectionResource = trustCertCollectionResource;
     }
 
-    public String getTrustCertCollectionResource() {
-        return trustCertCollectionResource;
-    }
-
     /**
      * This option specifies the top-level strategy for processing service requests and responses in streaming mode. If
      * an aggregation strategy is selected, all requests will be accumulated in the list, then transferred to the flow,
@@ -285,6 +288,10 @@ public class GrpcConfiguration {
         this.consumerStrategy = consumerStrategy;
     }
 
+    public boolean isForwardOnCompleted() {
+        return forwardOnCompleted;
+    }
+
     /**
      * Determines if onCompleted events should be pushed to the Camel route.
      */
@@ -292,8 +299,8 @@ public class GrpcConfiguration {
         this.forwardOnCompleted = forwardOnCompleted;
     }
 
-    public boolean isForwardOnCompleted() {
-        return forwardOnCompleted;
+    public boolean isForwardOnError() {
+        return forwardOnError;
     }
 
     /**
@@ -303,10 +310,6 @@ public class GrpcConfiguration {
         this.forwardOnError = forwardOnError;
     }
 
-    public boolean isForwardOnError() {
-        return forwardOnError;
-    }
-
     public GrpcProducerStrategy getProducerStrategy() {
         return producerStrategy;
     }
@@ -353,6 +356,10 @@ public class GrpcConfiguration {
         this.flowControlWindow = flowControlWindow;
     }
 
+    public int getMaxMessageSize() {
+        return maxMessageSize;
+    }
+
     /**
      * The maximum message size allowed to be received/sent (MiB)
      */
@@ -360,8 +367,22 @@ public class GrpcConfiguration {
         this.maxMessageSize = maxMessageSize;
     }
 
-    public int getMaxMessageSize() {
-        return maxMessageSize;
+    /**
+     * Lets the route to take control over stream observer. If this value is set to true, then the response observer of
+     * gRPC call will be set with the name {@link GrpcConstants.GRPC_RESPONSE_OBSERVER} in the Exchange object.
+     * <p>
+     * Please note that the stream observer's onNext(), onError(), onCompleted() methods should be called in the route.
+     */
+    public boolean isRouteControlledStreamObserver() {
+        return routeControlledStreamObserver;
+    }
+
+    public void setRouteControlledStreamObserver(boolean routeControlledStreamObserver) {
+        this.routeControlledStreamObserver = routeControlledStreamObserver;
+    }
+
+    public int getMaxConcurrentCallsPerConnection() {
+        return maxConcurrentCallsPerConnection;
     }
 
     /**
@@ -371,10 +392,6 @@ public class GrpcConfiguration {
         this.maxConcurrentCallsPerConnection = maxConcurrentCallsPerConnection;
     }
 
-    public int getMaxConcurrentCallsPerConnection() {
-        return maxConcurrentCallsPerConnection;
-    }
-
     public void parseURI(URI uri) {
         setHost(uri.getHost());
 
diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java
index f34e2c8..106813b 100644
--- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java
+++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConstants.java
@@ -55,4 +55,5 @@ public interface GrpcConstants {
      * The registry key to lookup a custom BindableServiceFactory
      */
     String GRPC_BINDABLE_SERVICE_FACTORY_NAME = "grpcBindableServiceFactory";
+    String GRPC_RESPONSE_OBSERVER = "grpcResponseObserver";
 }
diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
index f0b4a05..ecfe785 100644
--- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
+++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
@@ -103,6 +103,12 @@ public class GrpcConsumer extends DefaultConsumer {
             throw new IllegalArgumentException("No server start properties (host, port) specified");
         }
 
+        if (configuration.isRouteControlledStreamObserver()
+                && configuration.getConsumerStrategy() == GrpcConsumerStrategy.AGGREGATION) {
+            throw new IllegalArgumentException(
+                    "Consumer strategy AGGREGATION and routeControlledStreamObserver are not compatible. Set the consumer strategy to PROPAGATION");
+        }
+
         if (configuration.getNegotiationType() == NegotiationType.TLS) {
             ObjectHelper.notNull(configuration.getKeyCertChainResource(), "keyCertChainResource");
             ObjectHelper.notNull(configuration.getKeyResource(), "keyResource");
diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java
index abd74d9..68695ef 100644
--- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java
+++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java
@@ -39,19 +39,30 @@ public class GrpcMethodHandler {
         this.consumer = consumer;
     }
 
+    /**
+     * This method deals with the unary and server streaming gRPC calls
+     *
+     * @param  body             The request object sent by the gRPC client to the server
+     * @param  responseObserver The response stream observer
+     * @param  methodName       The name of the method invoked using the stub.
+     * @throws Exception        java.lang.Exception
+     */
     public void handle(Object body, StreamObserver<Object> responseObserver, String methodName) throws Exception {
         Map<String, Object> grcpHeaders = populateGrpcHeaders(methodName);
         GrpcEndpoint endpoint = (GrpcEndpoint) consumer.getEndpoint();
+
         Exchange exchange = endpoint.createExchange();
         exchange.getIn().setBody(body);
         exchange.getIn().setHeaders(grcpHeaders);
 
-        if (endpoint.isSynchronous()) {
-            consumer.getProcessor().process(exchange);
-        } else {
-            consumer.getAsyncProcessor().process(exchange);
+        if (endpoint.getConfiguration().isRouteControlledStreamObserver()) {
+            exchange.setProperty(GrpcConstants.GRPC_RESPONSE_OBSERVER, responseObserver);
+            invokeRoute(endpoint, exchange);
+            return;
         }
 
+        invokeRoute(endpoint, exchange);
+
         if (exchange.isFailed()) {
             responseObserver.onError(Status.INTERNAL
                     .withDescription(exchange.getException().getMessage())
@@ -62,9 +73,7 @@ public class GrpcMethodHandler {
             Object responseBody = exchange.getIn().getBody();
             if (responseBody instanceof List) {
                 List<Object> responseList = (List<Object>) responseBody;
-                responseList.forEach(responseItem -> {
-                    responseObserver.onNext(responseItem);
-                });
+                responseList.forEach(responseObserver::onNext);
             } else {
                 responseObserver.onNext(responseBody);
             }
@@ -72,6 +81,21 @@ public class GrpcMethodHandler {
         }
     }
 
+    private void invokeRoute(GrpcEndpoint endpoint, Exchange exchange) throws Exception {
+        if (endpoint.isSynchronous()) {
+            consumer.getProcessor().process(exchange);
+        } else {
+            consumer.getAsyncProcessor().process(exchange);
+        }
+    }
+
+    /**
+     * This method deals with the client streaming and bi-directional streaming gRPC calls
+     *
+     * @param  responseObserver The response stream observer
+     * @param  methodName       The name of the method invoked using the stub.
+     * @return                  Request stream observer
+     */
     public StreamObserver<Object> handleForConsumerStrategy(StreamObserver<Object> responseObserver, String methodName) {
         Map<String, Object> grcpHeaders = populateGrpcHeaders(methodName);
         GrpcEndpoint endpoint = (GrpcEndpoint) consumer.getEndpoint();
diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/RouteControlledStreamObserverTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/RouteControlledStreamObserverTest.java
new file mode 100644
index 0000000..238f151
--- /dev/null
+++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/RouteControlledStreamObserverTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class RouteControlledStreamObserverTest extends CamelTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerAggregationTest.class);
+
+    private static final int GRPC_SYNC_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable();
+    private static final int GRPC_ASYNC_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable();
+    private static final int GRPC_TEST_PING_ID = 1;
+    private static final String GRPC_TEST_PING_VALUE = "PING";
+    private static final String GRPC_TEST_PONG_VALUE = "PONG";
+
+    private ManagedChannel syncRequestChannel;
+    private ManagedChannel asyncRequestChannel;
+    private PingPongGrpc.PingPongBlockingStub blockingStub;
+    private PingPongGrpc.PingPongStub nonBlockingStub;
+    private PingPongGrpc.PingPongStub asyncNonBlockingStub;
+
+    @BeforeEach
+    public void startGrpcChannels() {
+        syncRequestChannel = ManagedChannelBuilder.forAddress("localhost", GRPC_SYNC_REQUEST_TEST_PORT).usePlaintext().build();
+        asyncRequestChannel
+                = ManagedChannelBuilder.forAddress("localhost", GRPC_ASYNC_REQUEST_TEST_PORT).usePlaintext().build();
+        blockingStub = PingPongGrpc.newBlockingStub(syncRequestChannel);
+        nonBlockingStub = PingPongGrpc.newStub(syncRequestChannel);
+        asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel);
+    }
+
+    @AfterEach
+    public void stopGrpcChannels() {
+        syncRequestChannel.shutdown().shutdownNow();
+        asyncRequestChannel.shutdown().shutdownNow();
+    }
+
+    @Test
+    public void testSyncSyncMethodInSync() {
+        LOG.info("gRPC pingSyncSync method blocking test start");
+        PingRequest pingRequest
+                = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+        PongResponse pongResponse = blockingStub.pingSyncSync(pingRequest);
+
+        assertNotNull(pongResponse);
+        assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId());
+        assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName());
+    }
+
+    @Test
+    public void testSyncAsyncMethodInSync() {
+        LOG.info("gRPC pingSyncAsync method blocking test start");
+        PingRequest pingRequest
+                = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+        Iterator<PongResponse> pongResponseIter = blockingStub.pingSyncAsync(pingRequest);
+        while (pongResponseIter.hasNext()) {
+            PongResponse pongResponse = pongResponseIter.next();
+            assertNotNull(pongResponse);
+            assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId());
+            assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName());
+        }
+    }
+
+    @Test
+    public void testSyncSyncMethodInAsync() throws Exception {
+        LOG.info("gRPC pingSyncSync method async test start");
+        final CountDownLatch latch = new CountDownLatch(1);
+        PingRequest pingRequest
+                = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+        PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch);
+
+        nonBlockingStub.pingSyncSync(pingRequest, responseObserver);
+        latch.await(5, TimeUnit.SECONDS);
+
+        PongResponse pongResponse = responseObserver.getPongResponse();
+
+        assertNotNull(pongResponse);
+        assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId());
+        assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName());
+    }
+
+    @Test
+    public void testSyncAsyncMethodInAsync() throws Exception {
+        LOG.info("gRPC pingSyncAsync method async test start");
+        final CountDownLatch latch = new CountDownLatch(1);
+        PingRequest pingRequest
+                = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+        PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch);
+
+        nonBlockingStub.pingSyncAsync(pingRequest, responseObserver);
+        latch.await(5, TimeUnit.SECONDS);
+
+        PongResponse pongResponse = responseObserver.getPongResponse();
+
+        assertNotNull(pongResponse);
+        assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId());
+        assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName());
+    }
+
+    @Test
+    public void testAsyncSyncMethodInAsync() throws Exception {
+        LOG.info("gRPC pingAsyncSync method async test start");
+        final CountDownLatch latch = new CountDownLatch(1);
+        PingRequest pingRequest
+                = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+        PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch);
+
+        StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncSync(responseObserver);
+        requestObserver.onNext(pingRequest);
+        requestObserver.onNext(pingRequest);
+        requestObserver.onCompleted();
+        latch.await(5, TimeUnit.SECONDS);
+
+        PongResponse pongResponse = responseObserver.getPongResponse();
+
+        assertNotNull(pongResponse);
+        assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId());
+        assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName());
+    }
+
+    @Test
+    public void testAsyncAsyncMethodInAsync() throws Exception {
+        LOG.info("gRPC pingAsyncAsync method async test start");
+        final CountDownLatch latch = new CountDownLatch(1);
+        PingRequest pingRequest
+                = PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(GRPC_TEST_PING_ID).build();
+        PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch);
+
+        StreamObserver<PingRequest> requestObserver = asyncNonBlockingStub.pingAsyncAsync(responseObserver);
+        requestObserver.onNext(pingRequest);
+        requestObserver.onNext(pingRequest);
+        requestObserver.onCompleted();
+        latch.await(5, TimeUnit.SECONDS);
+
+        PongResponse pongResponse = responseObserver.getPongResponse();
+
+        assertNotNull(pongResponse);
+        assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId());
+        assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE, pongResponse.getPongName());
+    }
+
+    @Test
+    public void unsupportedEndpointConfigurationFailureTest() throws Exception {
+        CamelContext camelContext = new DefaultCamelContext();
+        camelContext.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("grpc://localhost:" + GRPC_SYNC_REQUEST_TEST_PORT
+                     + "/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION" +
+                     "&routeControlledStreamObserver=true").to("log:foo");
+            }
+        });
+        assertThrows(IllegalArgumentException.class, camelContext::start);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            private void process(Exchange exchange) {
+                Message message = exchange.getIn();
+                PingRequest pingRequest = message.getBody(PingRequest.class);
+                StreamObserver<Object> responseObserver
+                        = (StreamObserver<Object>) exchange.getProperty(GrpcConstants.GRPC_RESPONSE_OBSERVER);
+                PongResponse pongResponse
+                        = PongResponse.newBuilder().setPongName(pingRequest.getPingName() + GRPC_TEST_PONG_VALUE)
+                                .setPongId(pingRequest.getPingId()).build();
+                message.setBody(pongResponse, PongResponse.class);
+                exchange.setMessage(message);
+                responseObserver.onNext(pongResponse);
+                responseObserver.onCompleted();
+            }
+
+            @Override
+            public void configure() {
+                from("grpc://localhost:" + GRPC_SYNC_REQUEST_TEST_PORT
+                     + "/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=PROPAGATION&routeControlledStreamObserver=true")
+                             .process(this::process);
+
+                from("grpc://localhost:" + GRPC_ASYNC_REQUEST_TEST_PORT
+                     + "/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION")
+                             .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse");
+            }
+        };
+    }
+
+    public static class PongResponseStreamObserver implements StreamObserver<PongResponse> {
+        private final CountDownLatch latch;
+        private PongResponse pongResponse;
+
+        public PongResponseStreamObserver(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        public PongResponse getPongResponse() {
+            return pongResponse;
+        }
+
+        @Override
+        public void onNext(PongResponse value) {
+            pongResponse = value;
+        }
+
+        @Override
+        public void onError(Throwable t) {
+            LOG.info("Exception", t);
+            latch.countDown();
+        }
+
+        @Override
+        public void onCompleted() {
+            latch.countDown();
+        }
+    }
+
+    public static class GrpcMessageBuilder {
+        @SuppressWarnings("unused")
+        public PongResponse buildAsyncPongResponse(List<PingRequest> pingRequests) {
+            return PongResponse.newBuilder().setPongName(pingRequests.get(0).getPingName() + GRPC_TEST_PONG_VALUE)
+                    .setPongId(pingRequests.get(0).getPingId()).build();
+        }
+    }
+}
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GrpcEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GrpcEndpointBuilderFactory.java
index 9c80c4d..ea44876 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GrpcEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GrpcEndpointBuilderFactory.java
@@ -253,6 +253,40 @@ public interface GrpcEndpointBuilderFactory {
             return this;
         }
         /**
+         * Lets the route to take control over stream observer. If this value is
+         * set to true, then the response observer of gRPC call will be set with
+         * the name GrpcConstants.GRPC_RESPONSE_OBSERVER in the Exchange object.
+         * Please note that the stream observer's onNext(), onError(),
+         * onCompleted() methods should be called in the route.
+         * 
+         * The option is a: <code>boolean</code> type.
+         * 
+         * Default: false
+         * Group: consumer
+         */
+        default GrpcEndpointConsumerBuilder routeControlledStreamObserver(
+                boolean routeControlledStreamObserver) {
+            doSetProperty("routeControlledStreamObserver", routeControlledStreamObserver);
+            return this;
+        }
+        /**
+         * Lets the route to take control over stream observer. If this value is
+         * set to true, then the response observer of gRPC call will be set with
+         * the name GrpcConstants.GRPC_RESPONSE_OBSERVER in the Exchange object.
+         * Please note that the stream observer's onNext(), onError(),
+         * onCompleted() methods should be called in the route.
+         * 
+         * The option will be converted to a <code>boolean</code> type.
+         * 
+         * Default: false
+         * Group: consumer
+         */
+        default GrpcEndpointConsumerBuilder routeControlledStreamObserver(
+                String routeControlledStreamObserver) {
+            doSetProperty("routeControlledStreamObserver", routeControlledStreamObserver);
+            return this;
+        }
+        /**
          * Authentication method type in advance to the SSL/TLS negotiation.
          * 
          * The option is a:
diff --git a/docs/components/modules/ROOT/pages/grpc-component.adoc b/docs/components/modules/ROOT/pages/grpc-component.adoc
index 7b9ac70..1d465fd 100644
--- a/docs/components/modules/ROOT/pages/grpc-component.adoc
+++ b/docs/components/modules/ROOT/pages/grpc-component.adoc
@@ -85,7 +85,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (27 parameters):
+=== Query Parameters (28 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -98,6 +98,7 @@ with the following path and query parameters:
 | *forwardOnCompleted* (consumer) | Determines if onCompleted events should be pushed to the Camel route. | false | boolean
 | *forwardOnError* (consumer) | Determines if onError events should be pushed to the Camel route. Exceptions will be set as message body. | false | boolean
 | *maxConcurrentCallsPer{zwsp}Connection* (consumer) | The maximum number of concurrent calls permitted for each incoming server connection | 2147483647 | int
+| *routeControlledStreamObserver* (consumer) | Lets the route to take control over stream observer. If this value is set to true, then the response observer of gRPC call will be set with the name GrpcConstants.GRPC_RESPONSE_OBSERVER in the Exchange object. Please note that the stream observer's onNext(), onError(), onCompleted() methods should be called in the route. | false | boolean
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut |  | ExchangePattern
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]