You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/04/25 07:49:24 UTC

[camel] branch main updated: CAMEL-17963: added resume support for Cassandra CQL

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 438ce8c3592 CAMEL-17963: added resume support for Cassandra CQL
438ce8c3592 is described below

commit 438ce8c35929587fd9dbfdbfe56800cec6040ae4
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Apr 22 15:32:35 2022 +0200

    CAMEL-17963: added resume support for Cassandra CQL
---
 .../component/cassandra/CassandraConsumer.java     | 22 ++++-
 .../consumer/support/CassandraResumeStrategy.java  | 44 ++++++++++
 .../CassandraComponentResumeStrategyIT.java        | 96 ++++++++++++++++++++++
 .../docs/modules/eips/pages/resume-strategies.adoc |  1 +
 4 files changed, 162 insertions(+), 1 deletion(-)

diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
index cc516d82d29..659bc4cec9c 100644
--- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
@@ -22,18 +22,22 @@ import com.datastax.oss.driver.api.core.cql.ResultSet;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.ResumeAware;
+import org.apache.camel.component.cassandra.consumer.support.CassandraResumeStrategy;
 import org.apache.camel.support.ScheduledPollConsumer;
 
 /**
  * Cassandra 2 CQL3 consumer.
  */
-public class CassandraConsumer extends ScheduledPollConsumer {
+public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAware<CassandraResumeStrategy> {
 
     /**
      * Prepared statement used for polling
      */
     private PreparedStatement preparedStatement;
 
+    private CassandraResumeStrategy resumeStrategy;
+
     public CassandraConsumer(CassandraEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
     }
@@ -78,6 +82,13 @@ public class CassandraConsumer extends ScheduledPollConsumer {
         if (isPrepareStatements()) {
             preparedStatement = getEndpoint().prepareStatement();
         }
+
+        if (resumeStrategy != null) {
+            CqlSession session = getEndpoint().getSessionHolder().getSession();
+
+            resumeStrategy.setSession(session);
+            resumeStrategy.resume();
+        }
     }
 
     @Override
@@ -90,4 +101,13 @@ public class CassandraConsumer extends ScheduledPollConsumer {
         return getEndpoint().isPrepareStatements();
     }
 
+    @Override
+    public CassandraResumeStrategy getResumeStrategy() {
+        return resumeStrategy;
+    }
+
+    @Override
+    public void setResumeStrategy(CassandraResumeStrategy resumeStrategy) {
+        this.resumeStrategy = resumeStrategy;
+    }
 }
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeStrategy.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeStrategy.java
new file mode 100644
index 00000000000..220242c7b2c
--- /dev/null
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeStrategy.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.cassandra.consumer.support;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import org.apache.camel.ResumeStrategy;
+
+/**
+ * Provides a resume strategy for Cassandra consumers
+ */
+public interface CassandraResumeStrategy extends ResumeStrategy {
+
+    /**
+     * Sets the session that allow implementations to run a one-time query on the DB
+     * 
+     * @param session
+     */
+    void setSession(CqlSession session);
+
+    @Override
+    default void start() {
+
+    }
+
+    @Override
+    default void stop() {
+
+    }
+}
diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
new file mode 100644
index 00000000000..dcff5c23842
--- /dev/null
+++ b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.cassandra.integration;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cassandra.consumer.support.CassandraResumeStrategy;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class CassandraComponentResumeStrategyIT extends BaseCassandra {
+
+    private static class TestCassandraResumeStrategy implements CassandraResumeStrategy {
+        private boolean sessionCalled;
+        private boolean sessionNotNull;
+        private boolean resumeCalled;
+
+        @Override
+        public void setSession(CqlSession session) {
+            sessionCalled = true;
+            sessionNotNull = session != null;
+        }
+
+        @Override
+        public void resume() {
+            resumeCalled = true;
+        }
+
+        public boolean isSessionCalled() {
+            return sessionCalled;
+        }
+
+        public boolean isSessionNotNull() {
+            return sessionNotNull;
+        }
+
+        public boolean isResumeCalled() {
+            return resumeCalled;
+        }
+    }
+
+    private static final String CQL = "select login, first_name, last_name from camel_user";
+    private final TestCassandraResumeStrategy resumeStrategy = new TestCassandraResumeStrategy();
+
+    @Test
+    public void testConsumeAll() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:resultAll");
+        mock.expectedMinimumMessageCount(1);
+        mock.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) {
+                Object body = exchange.getIn().getBody();
+                assertTrue(body instanceof List);
+            }
+        });
+        mock.await(1, TimeUnit.SECONDS);
+        assertMockEndpointsSatisfied();
+
+        assertTrue(resumeStrategy.isSessionCalled());
+        assertTrue(resumeStrategy.isSessionNotNull());
+        assertTrue(resumeStrategy.isResumeCalled());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                fromF("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, CQL)
+                        .resumable(resumeStrategy)
+                        .to("mock:resultAll");
+            }
+        };
+    }
+}
diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
index 039a667323d..c83b37d5aab 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
@@ -13,6 +13,7 @@ Support for resume varies according to the component. Initially, the support is
 
 * xref:components::atom-component.adoc[camel-atom]
 * xref:components::aws2-kinesis-component.adoc[camel-aws2-kinesis]
+* xref:components::cql-component.adoc[camel-cassandracql]
 * xref:components::couchdb-component.adoc[camel-couchdb]
 * xref:components::file-component.adoc[camel-file]
 * xref:components::kafka-component.adoc[camel-kafka]