You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2022/04/07 12:55:35 UTC

[GitHub] [camel] orpiske opened a new pull request, #7387: RFC: CAMEL-17051: extend the Resume API to include support for pausable consumers

orpiske opened a new pull request, #7387:
URL: https://github.com/apache/camel/pull/7387

   This adds a new API - working similarly to the resume API - that allow
   pausing Kafka consumption based on external factors (ie.: such as an
   external system being unavailable). 
   
   It is done in generic way to that we can extend it for other components.
   Additionally, it should provide the ground work for implementing
   (distributed) partitioning support in the future. 
   
   What do you think?
   
   Obs.: a few known things to be fixed before merge
   
   1. Documentation
   2. There's some poor code in the component hot path that can be
      optimized
   3. + any comments from review
   
   <!-- Uncomment and fill this section if your PR is not trivial
   - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/browse/CAMEL) filed for the change (usually before you start working on it).  Trivial changes like typos do not require a JIRA issue.  Your pull request should address just this issue, without pulling in other changes.
   - [ ] Each commit in the pull request should have a meaningful subject line and body.
   - [ ] If you're unsure, you can format the pull request title like `[CAMEL-XXX] Fixes bug in camel-file component`, where you replace `CAMEL-XXX` with the appropriate JIRA issue.
   - [ ] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [ ] Run `mvn clean install -Psourcecheck` in your module with source check enabled to make sure basic checks pass and there are no checkstyle violations. A more thorough check will be performed on your pull request automatically.
   Below are the contribution guidelines:
   https://github.com/apache/camel/blob/main/CONTRIBUTING.md
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] davsclaus commented on pull request #7387: RFC: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
davsclaus commented on PR #7387:
URL: https://github.com/apache/camel/pull/7387#issuecomment-1094712449

   Hi Great work so far
   
   I think pausing should be possible both manually and automatic. The former is to integrate with Camel's existing Lifecycle with the suspend/resume on the Camel Service.
   
   1) manual
   KafkaConsumer (from kafka-client) have its own pause/resume API, which seems to not be in use. I think camel end users would want camel-kafka to use these APIs for pausing.
   
   Camel has suspend/resume operations on a route (can call via Java or JMX) so users can force a pauses/resume. I think the camel kafka consumer should then implement Suspendable and then the doSuspend/doResume operations should then trigger the kafka-client pauses/resume methods.
   
   2) automatic
   The generic way with a "boolean" seems a bit more more like a readiness check that would break out the poll loop, which you can then "configure" or "customize" to make it automatic. I wonder if the BooleanSupplier API is a bit too simplistic? You may need access to the Camel Consumer so you have a way to use information from there to know if you can continue or not. 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #7387: RFC: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #7387:
URL: https://github.com/apache/camel/pull/7387#issuecomment-1091772624

   :heavy_check_mark: Finished component verification: 0 component(s) test failed out of **2 component(s) tested**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] orpiske merged pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
orpiske merged PR #7387:
URL: https://github.com/apache/camel/pull/7387


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #7387:
URL: https://github.com/apache/camel/pull/7387#issuecomment-1096791444

   :x: Finished component verification: **1 component(s) test failed** out of 2 component(s) tested


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] orpiske commented on pull request #7387: RFC: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
orpiske commented on PR #7387:
URL: https://github.com/apache/camel/pull/7387#issuecomment-1094779843

   Thanks!
   
   > Hi Great work so far
   > 
   > I think pausing should be possible both manually and automatic. The former is to integrate with Camel's existing Lifecycle with the suspend/resume on the Camel Service.
   > 
   >     1. manual
   >        KafkaConsumer (from kafka-client) have its own pause/resume API, which seems to not be in use. I think camel end users would want camel-kafka to use these APIs for pausing.
   
   Yes, that's was the primary goal for CAMEL-17051. I think this is a common problem for other components too even if they don't have native pause support like the Kafka client does. 
   I also think that native pause support may be important in the future once we start exploring partitioning support (granted: this is just a speculation ATM, but I supose it will be helpful) 
   
   > 
   > 
   > Camel has suspend/resume operations on a route (can call via Java or JMX) so users can force a pauses/resume. I think the camel kafka consumer should then implement Suspendable and then the doSuspend/doResume operations should then trigger the kafka-client pauses/resume methods.
   
   So, I think I should explore extending this API to support that.
   
   
   > 
   >     2. automatic
   >        The generic way with a "boolean" seems a bit more more like a readiness check that would break out the poll loop, which you can then "configure" or "customize" to make it automatic. I wonder if the BooleanSupplier API is a bit too simplistic? You may need access to the Camel Consumer so you have a way to use information from there to know if you can continue or not.
   
   I think you raise a good point here. I will take a closer look at whether the BooleanSupplier can be enough or whether I can come up with a more flexible design for this part of the API.
   
   Thanks for the review!
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #7387:
URL: https://github.com/apache/camel/pull/7387#issuecomment-1096764895

   :heavy_check_mark: Finished component verification: 0 component(s) test failed out of **2 component(s) tested**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] davsclaus commented on a diff in pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
davsclaus commented on code in PR #7387:
URL: https://github.com/apache/camel/pull/7387#discussion_r848338829


##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java:
##########
@@ -499,4 +510,19 @@ long getCurrentRecoveryInterval() {
     public BridgeExceptionHandlerToErrorHandler getBridge() {
         return bridge;
     }
+
+    /*
+     * This is for manually pausing the consumer
+     */
+    public void pause() {
+        consumer.pause(consumer.assignment());
+    }
+
+    /*
+     * This is for manually resuming the consumer (not to be confused w/ the Resume API). This is
+     * mostly used for directly calling pause from Java code or via JMX

Review Comment:
   should this javadoc not be added to the pause method above



##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java:
##########
@@ -24,6 +24,20 @@
  */
 public interface PollExceptionStrategy {
 
+    /**
+     * Reset any error flags set by a previous error condition
+     */
+    default void reset() {
+
+    }
+
+    /**
+     * Whether to continue fetching or not
+     * 
+     * @return

Review Comment:
   polish javadoc



##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java:
##########
@@ -1612,15 +1612,15 @@ public void setValueDeserializer(String valueDeserializer) {
         this.valueDeserializer = valueDeserializer;
     }
 
-    public String getSeekTo() {
+    public SeekPolicy getSeekTo() {
         return seekTo;
     }
 
     /**
      * Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from
      * end This is replacing the earlier property seekToBeginning

Review Comment:
   Remove that outdated comment about seekToBeginning



##########
catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json:
##########
@@ -0,0 +1,20 @@
+{
+  "model": {
+    "kind": "model",
+    "name": "pausable",
+    "title": "Pausable",
+    "description": "Resume EIP to support resuming processing from last known offset.",

Review Comment:
   I think the description is a copy from the resume EIP. It should be something about pausing



##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java:
##########
@@ -24,6 +24,20 @@
  */
 public interface PollExceptionStrategy {

Review Comment:
   Add class javadoc if missing as its an interface Camel end users can implement



##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.errorhandler;
+
+import java.util.function.Predicate;
+
+import org.apache.camel.ConsumerListener;
+import org.apache.camel.component.kafka.SeekPolicy;
+import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaConsumerListener implements ConsumerListener<Object, ProcessingResult> {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerListener.class);
+    private Consumer<?, ?> consumer;
+    private SeekPolicy seekPolicy;
+
+    private Predicate<?> afterConsumeEval;
+
+    public Consumer<?, ?> getConsumer() {
+        return consumer;
+    }
+
+    public void setConsumer(Consumer<?, ?> consumer) {
+        this.consumer = consumer;
+    }
+
+    public SeekPolicy getSeekPolicy() {
+        return seekPolicy;
+    }
+
+    public void setSeekPolicy(SeekPolicy seekPolicy) {
+        this.seekPolicy = seekPolicy;
+    }
+
+    @Override
+    public void setResumableCheck(Predicate<?> afterConsumeEval) {
+        this.afterConsumeEval = afterConsumeEval;
+    }
+
+    @Override
+    public boolean afterConsume(@SuppressWarnings("unused") Object ignored) {
+        if (afterConsumeEval.test(null)) {
+            LOG.warn("State changed, therefore resuming the consumer");
+            consumer.resume(consumer.assignment());
+
+            return true;
+        }
+
+        LOG.warn("The consumer is not yet resumable");
+        return false;
+    }
+
+    @Override
+    public boolean afterProcess(ProcessingResult result) {
+        if (result.isFailed()) {
+            LOG.warn("Pausing consumer due to error on the last processing");
+            consumer.pause(consumer.assignment());
+
+            if (seekPolicy == SeekPolicy.BEGINNING) {

Review Comment:
   Is this desired behaviour? That in case of an failure (what can trigger that failure in ProcessingResult?)
   
   That the consumer is seeking to either beginning or end.?



##########
core/camel-api/src/main/java/org/apache/camel/ConsumerListener.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel;
+
+import java.util.function.Predicate;
+
+/**
+ * Proxies between the consumer within Camel to

Review Comment:
   Polish javadoc



##########
core/camel-api/src/main/java/org/apache/camel/ConsumerListener.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel;
+
+import java.util.function.Predicate;
+
+/**
+ * Proxies between the consumer within Camel to
+ */
+public interface ConsumerListener<C, P> {
+
+    /**
+     * Whether processing can resume or not
+     *
+     * @return true if it can, or false otherwise
+     */
+    void setResumableCheck(Predicate<?> afterConsumeEval);

Review Comment:
   Method does not return anything. Can you update javadoc on these methods, such as tell what returning true or false means in the following methods



##########
core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.model;
+
+import java.util.function.Predicate;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.ConsumerListener;
+import org.apache.camel.spi.Metadata;
+
+/**
+ * Resume EIP to support resuming processing from last known offset.
+ */
+@Metadata(label = "eip,routing")
+@XmlRootElement(name = "pausable")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class PausableDefinition extends NoOutputDefinition<PausableDefinition> {
+
+    @XmlTransient
+    private ConsumerListener<?, ?> consumerListenerBean;
+
+    @XmlAttribute(required = true)
+    @Metadata(required = true, javaType = "org.apache.camel.ConsumerListener")
+    private String consumerListener;
+
+    @XmlTransient
+    private Predicate<?> untilCheck;
+
+    @XmlAttribute(required = true)
+    private String untilCheckRef;

Review Comment:
   Avoid using xxxRef style, but just name it xxx
   Also set javaType = "...Predicate" as its a known class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #7387: RFC: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #7387:
URL: https://github.com/apache/camel/pull/7387#issuecomment-1091702099

   :warning: This PR changes Camel components and will be tested automatically.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #7387: RFC: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #7387:
URL: https://github.com/apache/camel/pull/7387#issuecomment-1096457755

   :x: Finished component verification: **1 component(s) test failed** out of 2 component(s) tested


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] orpiske commented on a diff in pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
orpiske commented on code in PR #7387:
URL: https://github.com/apache/camel/pull/7387#discussion_r848356354


##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java:
##########
@@ -24,6 +24,20 @@
  */
 public interface PollExceptionStrategy {

Review Comment:
   This is not meant for the end users to implement. This is internal to the component. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] orpiske commented on a diff in pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
orpiske commented on code in PR #7387:
URL: https://github.com/apache/camel/pull/7387#discussion_r848393822


##########
core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.model;
+
+import java.util.function.Predicate;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.ConsumerListener;
+import org.apache.camel.spi.Metadata;
+
+/**
+ * Resume EIP to support resuming processing from last known offset.
+ */
+@Metadata(label = "eip,routing")
+@XmlRootElement(name = "pausable")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class PausableDefinition extends NoOutputDefinition<PausableDefinition> {
+
+    @XmlTransient
+    private ConsumerListener<?, ?> consumerListenerBean;
+
+    @XmlAttribute(required = true)
+    @Metadata(required = true, javaType = "org.apache.camel.ConsumerListener")
+    private String consumerListener;
+
+    @XmlTransient
+    private Predicate<?> untilCheck;
+
+    @XmlAttribute(required = true)
+    private String untilCheckRef;

Review Comment:
   Ah, I'll do like the resume api definition and rename the predicate instead ;)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] orpiske commented on pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
orpiske commented on PR #7387:
URL: https://github.com/apache/camel/pull/7387#issuecomment-1096971122

   Will merge as soon as we have a clean build on this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #7387:
URL: https://github.com/apache/camel/pull/7387#issuecomment-1097040611

   :heavy_check_mark: Finished component verification: 0 component(s) test failed out of **2 component(s) tested**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] orpiske commented on a diff in pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
orpiske commented on code in PR #7387:
URL: https://github.com/apache/camel/pull/7387#discussion_r848350865


##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java:
##########
@@ -499,4 +510,19 @@ long getCurrentRecoveryInterval() {
     public BridgeExceptionHandlerToErrorHandler getBridge() {
         return bridge;
     }
+
+    /*
+     * This is for manually pausing the consumer
+     */
+    public void pause() {
+        consumer.pause(consumer.assignment());
+    }
+
+    /*
+     * This is for manually resuming the consumer (not to be confused w/ the Resume API). This is
+     * mostly used for directly calling pause from Java code or via JMX

Review Comment:
   Yeap. Fixing it, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #7387:
URL: https://github.com/apache/camel/pull/7387#issuecomment-1096736066

   :heavy_check_mark: Finished component verification: 0 component(s) test failed out of **2 component(s) tested**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] orpiske commented on a diff in pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
orpiske commented on code in PR #7387:
URL: https://github.com/apache/camel/pull/7387#discussion_r848366942


##########
core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.model;
+
+import java.util.function.Predicate;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.ConsumerListener;
+import org.apache.camel.spi.Metadata;
+
+/**
+ * Resume EIP to support resuming processing from last known offset.
+ */
+@Metadata(label = "eip,routing")
+@XmlRootElement(name = "pausable")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class PausableDefinition extends NoOutputDefinition<PausableDefinition> {
+
+    @XmlTransient
+    private ConsumerListener<?, ?> consumerListenerBean;
+
+    @XmlAttribute(required = true)
+    @Metadata(required = true, javaType = "org.apache.camel.ConsumerListener")
+    private String consumerListener;
+
+    @XmlTransient
+    private Predicate<?> untilCheck;
+
+    @XmlAttribute(required = true)
+    private String untilCheckRef;

Review Comment:
   In this case it would clash with the untilCheck variable above, but I will name it something else.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] orpiske commented on pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
orpiske commented on PR #7387:
URL: https://github.com/apache/camel/pull/7387#issuecomment-1096602573

   Resolved TODO + comments. 
   
   Marking it as ready for review, to avoid packing too much into it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] orpiske commented on a diff in pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
orpiske commented on code in PR #7387:
URL: https://github.com/apache/camel/pull/7387#discussion_r848359696


##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.errorhandler;
+
+import java.util.function.Predicate;
+
+import org.apache.camel.ConsumerListener;
+import org.apache.camel.component.kafka.SeekPolicy;
+import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaConsumerListener implements ConsumerListener<Object, ProcessingResult> {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerListener.class);
+    private Consumer<?, ?> consumer;
+    private SeekPolicy seekPolicy;
+
+    private Predicate<?> afterConsumeEval;
+
+    public Consumer<?, ?> getConsumer() {
+        return consumer;
+    }
+
+    public void setConsumer(Consumer<?, ?> consumer) {
+        this.consumer = consumer;
+    }
+
+    public SeekPolicy getSeekPolicy() {
+        return seekPolicy;
+    }
+
+    public void setSeekPolicy(SeekPolicy seekPolicy) {
+        this.seekPolicy = seekPolicy;
+    }
+
+    @Override
+    public void setResumableCheck(Predicate<?> afterConsumeEval) {
+        this.afterConsumeEval = afterConsumeEval;
+    }
+
+    @Override
+    public boolean afterConsume(@SuppressWarnings("unused") Object ignored) {
+        if (afterConsumeEval.test(null)) {
+            LOG.warn("State changed, therefore resuming the consumer");
+            consumer.resume(consumer.assignment());
+
+            return true;
+        }
+
+        LOG.warn("The consumer is not yet resumable");
+        return false;
+    }
+
+    @Override
+    public boolean afterProcess(ProcessingResult result) {
+        if (result.isFailed()) {
+            LOG.warn("Pausing consumer due to error on the last processing");
+            consumer.pause(consumer.assignment());
+
+            if (seekPolicy == SeekPolicy.BEGINNING) {

Review Comment:
   Yes. If there is a failure in processing in subsequent steps. For example, from the integration test: 
   
   ```
   from(from)
                           .pausable(testConsumerListener, o -> canContinue())
                           .routeId("pausable-it")
                           .process(exchange -> LOG.info("Got record from Kafka: {}", exchange.getMessage().getBody()))
                           .to(intermediate);
   
                   from(intermediate)
                           .process(exchange -> {
                               LOG.info("Got record on the intermediate processor: {}", exchange.getMessage().getBody());
   
                               if (getCount() <= RETRY_COUNT) {
                                   throw new RuntimeCamelException("Error");
                               }
                           })
                           .to(to);
   ```
   
   Such failure will be added to the processing result (which is internal to the component). IOW, the downstream failures should end up at this point which then (based on the policy) will rewind the consumer to the desired point in the partition. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #7387: RFC: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #7387:
URL: https://github.com/apache/camel/pull/7387#issuecomment-1096436887

   :heavy_check_mark: Finished component verification: 0 component(s) test failed out of **2 component(s) tested**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #7387:
URL: https://github.com/apache/camel/pull/7387#issuecomment-1096660858

   :x: Finished component verification: **1 component(s) test failed** out of 2 component(s) tested


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #7387:
URL: https://github.com/apache/camel/pull/7387#issuecomment-1096927924

   :x: Finished component verification: **1 component(s) test failed** out of 2 component(s) tested


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] orpiske commented on a diff in pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
orpiske commented on code in PR #7387:
URL: https://github.com/apache/camel/pull/7387#discussion_r848351573


##########
catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json:
##########
@@ -0,0 +1,20 @@
+{
+  "model": {
+    "kind": "model",
+    "name": "pausable",
+    "title": "Pausable",
+    "description": "Resume EIP to support resuming processing from last known offset.",

Review Comment:
   Note to self: avoid copying class headers. 
   
   Good catch, thanks! 



##########
catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json:
##########
@@ -0,0 +1,20 @@
+{
+  "model": {
+    "kind": "model",
+    "name": "pausable",
+    "title": "Pausable",
+    "description": "Resume EIP to support resuming processing from last known offset.",

Review Comment:
   Note to self: avoid copying class annotations. 
   
   Good catch, thanks! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #7387:
URL: https://github.com/apache/camel/pull/7387#issuecomment-1096636102

   :x: Finished component verification: **1 component(s) test failed** out of 9 component(s) tested


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [camel] github-actions[bot] commented on pull request #7387: CAMEL-17051: extend the Resume API to include support for pausable consumers

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #7387:
URL: https://github.com/apache/camel/pull/7387#issuecomment-1096637948

   :x: Finished component verification: **1 component(s) test failed** out of 2 component(s) tested


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org