You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/03/29 12:48:08 UTC

[camel] branch camel-3.x updated: CAMEL-19216: camel-kudu - add predicate support to the scan operation (#9672)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.x by this push:
     new 8b29e614204 CAMEL-19216: camel-kudu - add predicate support to the scan operation (#9672)
8b29e614204 is described below

commit 8b29e61420488cb2718f59e19bd8055665a189d1
Author: Kengo Seki <se...@apache.org>
AuthorDate: Wed Mar 29 21:47:04 2023 +0900

    CAMEL-19216: camel-kudu - add predicate support to the scan operation (#9672)
    
    * CAMEL-19216: camel-kudu - add predicate support to the scan operation
    
    Currently, the scan operation of the Kudu producer scans
    entire table and sets the whole results to the message body,
    but it's inefficient or not practical if the table is large.
    This PR adds predicate support to the scan operation
    so that users can filter the results.
    
    * Add error message to assertEquals as a parameter instead of comment
---
 .../apache/camel/component/kudu/KuduConstants.java |  2 ++
 .../apache/camel/component/kudu/KuduProducer.java  |  4 ++-
 .../org/apache/camel/component/kudu/KuduUtils.java | 14 +++++++---
 .../apache/camel/component/kudu/KuduScanTest.java  | 30 ++++++++++++++++++++++
 4 files changed, 46 insertions(+), 4 deletions(-)

diff --git a/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduConstants.java b/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduConstants.java
index ebf4a6066f2..57ae08069de 100644
--- a/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduConstants.java
+++ b/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduConstants.java
@@ -23,6 +23,8 @@ public final class KuduConstants {
     public static final String CAMEL_KUDU_SCHEMA = "CamelKuduSchema";
     @Metadata(description = "The create table options", javaType = "org.apache.kudu.client.CreateTableOptions")
     public static final String CAMEL_KUDU_TABLE_OPTIONS = "CamelKuduTableOptions";
+    @Metadata(description = "The predicate for scan operation", javaType = "org.apache.kudu.client.KuduPredicate")
+    public static final String CAMEL_KUDU_SCAN_PREDICATE = "CamelKuduScanPredicate";
 
     private KuduConstants() {
     }
diff --git a/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduProducer.java b/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduProducer.java
index 4a01d40c869..104061fb941 100644
--- a/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduProducer.java
+++ b/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduProducer.java
@@ -27,6 +27,7 @@ import org.apache.kudu.client.Delete;
 import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.Update;
@@ -175,6 +176,7 @@ public class KuduProducer extends DefaultProducer {
     }
 
     private void doScan(Exchange exchange, String tableName) throws KuduException {
-        exchange.getIn().setBody(KuduUtils.doScan(tableName, endpoint.getKuduClient()));
+        KuduPredicate predicate = (KuduPredicate) exchange.getIn().getHeader(KuduConstants.CAMEL_KUDU_SCAN_PREDICATE);
+        exchange.getIn().setBody(KuduUtils.doScan(tableName, endpoint.getKuduClient(), predicate));
     }
 }
diff --git a/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduUtils.java b/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduUtils.java
index 88aa497efee..ab964f00b71 100644
--- a/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduUtils.java
+++ b/components/camel-kudu/src/main/java/org/apache/camel/component/kudu/KuduUtils.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
 import org.apache.kudu.client.KuduScanner;
 import org.apache.kudu.client.KuduScannerIterator;
 import org.apache.kudu.client.KuduTable;
@@ -57,6 +58,11 @@ public final class KuduUtils {
     }
 
     public static List<Map<String, Object>> doScan(String tableName, KuduClient connection) throws KuduException {
+        return doScan(tableName, connection, null);
+    }
+
+    public static List<Map<String, Object>> doScan(String tableName, KuduClient connection, KuduPredicate predicate)
+            throws KuduException {
         LOG.trace("Scanning table {}", tableName);
         KuduTable table = connection.openTable(tableName);
 
@@ -66,9 +72,11 @@ public final class KuduUtils {
             projectColumns.add(columnSchema.getName());
         }
 
-        KuduScanner scanner = connection.newScannerBuilder(table)
-                .setProjectedColumnNames(projectColumns)
-                .build();
+        KuduScanner.KuduScannerBuilder builder = connection.newScannerBuilder(table);
+        if (predicate != null) {
+            builder.addPredicate(predicate);
+        }
+        KuduScanner scanner = builder.setProjectedColumnNames(projectColumns).build();
         return KuduUtils.scannerToList(table, scanner);
     }
 }
diff --git a/components/camel-kudu/src/test/java/org/apache/camel/component/kudu/KuduScanTest.java b/components/camel-kudu/src/test/java/org/apache/camel/component/kudu/KuduScanTest.java
index 71a1cb4d2c3..73d80ba9b23 100644
--- a/components/camel-kudu/src/test/java/org/apache/camel/component/kudu/KuduScanTest.java
+++ b/components/camel-kudu/src/test/java/org/apache/camel/component/kudu/KuduScanTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.kudu;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -23,6 +24,9 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.KuduPredicate;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -107,6 +111,32 @@ public class KuduScanTest extends AbstractKuduTest {
 
     }
 
+    @Test
+    public void scanWithPredicate() throws InterruptedException {
+        errorEndpoint.expectedMessageCount(0);
+        successEndpoint.expectedMessageCount(2);
+
+        // without predicate
+        Map<String, Object> headers = new HashMap<>();
+        headers.put(KuduConstants.CAMEL_KUDU_SCAN_PREDICATE, null);
+        sendBody("direct:scan", null, headers);
+        List<Map<String, Object>> results = (List<Map<String, Object>>) successEndpoint.getReceivedExchanges()
+                .get(0).getIn().getBody(List.class);
+        assertEquals(2, results.size(), "two records with id=1 and id=2 are expected to be returned");
+
+        // with predicate
+        ColumnSchema schema = new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).build();
+        KuduPredicate predicate = KuduPredicate.newComparisonPredicate(schema, KuduPredicate.ComparisonOp.EQUAL, 2);
+        headers.put(KuduConstants.CAMEL_KUDU_SCAN_PREDICATE, predicate);
+        sendBody("direct:scan", null, headers);
+        results = (List<Map<String, Object>>) successEndpoint.getReceivedExchanges()
+                .get(1).getIn().getBody(List.class);
+        assertEquals(1, results.size(), "only one record with id=2 is expected to be returned");
+
+        errorEndpoint.assertIsSatisfied();
+        successEndpoint.assertIsSatisfied();
+    }
+
     @Test
     public void scanTable() throws InterruptedException {
         createTestTable("TestTable");