You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2022/12/30 15:59:10 UTC

[beam] branch master updated: Fix Cassandra read bug when user query has no where clause (fixes #24829) (#24830)

This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ad72baceaef Fix Cassandra read bug when user query has no where clause (fixes #24829) (#24830)
ad72baceaef is described below

commit ad72baceaef2c3c0b44d0b9538689bd1eecc2d18
Author: Lucas Marques <34...@users.noreply.github.com>
AuthorDate: Fri Dec 30 12:59:03 2022 -0300

    Fix Cassandra read bug when user query has no where clause (fixes #24829) (#24830)
    
    * Fix Cassandra read bug when user query has no where clause
    
    * update changelog
    
    * fix format violation
    
    * fix remaining format violation
    
    * add unity test for read with unfiltered query
    
    * fix unfiltered query unit test count validation
    
    * use constant for num rows assertion on unfiltered query unit test
    
    Co-authored-by: Yi Hu <hu...@gmail.com>
    
    * fix cassandra query bugfix description
    
    Co-authored-by: Yi Hu <hu...@gmail.com>
    
    Co-authored-by: Lucas Marques <lu...@b2wdigital.com>
    Co-authored-by: Yi Hu <hu...@gmail.com>
---
 CHANGES.md                                         |  2 +-
 .../org/apache/beam/sdk/io/cassandra/ReadFn.java   |  5 ++-
 .../beam/sdk/io/cassandra/CassandraIOTest.java     | 41 ++++++++++++++++++++--
 3 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index c8d3eb97e88..e87cd3baf21 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -80,7 +80,7 @@
 
 ## Bugfixes
 
-* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
+* Avoids Cassandra syntax error when user-defined query has no where clause in it (Java) ([#24829](https://github.com/apache/beam/issues/24829)).
 
 ## Known Issues
 
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
index 6bca1cf3d17..3bb53602918 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
@@ -141,6 +141,9 @@ class ReadFn<T> extends DoFn<Read<T>, T> {
     return (spec.query() == null)
         ? String.format("SELECT * FROM %s.%s", spec.keyspace().get(), spec.table().get())
             + " WHERE "
-        : spec.query().get() + (hasRingRange ? " AND " : "");
+        : spec.query().get()
+            + (hasRingRange
+                ? spec.query().get().toUpperCase().contains("WHERE") ? " AND " : " WHERE "
+                : "");
   }
 }
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
index 7196556abc7..a472b9ee1c3 100644
--- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
@@ -458,6 +458,11 @@ public class CassandraIOTest implements Serializable {
 
   @Test
   public void testReadWithQuery() throws Exception {
+    String query =
+        String.format(
+            "select person_id, writetime(person_name) from %s.%s where person_id=10 AND person_department='logic'",
+            CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
+
     PCollection<Scientist> output =
         pipeline.apply(
             CassandraIO.<Scientist>read()
@@ -466,8 +471,7 @@ public class CassandraIOTest implements Serializable {
                 .withKeyspace(CASSANDRA_KEYSPACE)
                 .withTable(CASSANDRA_TABLE)
                 .withMinNumberOfSplits(20)
-                .withQuery(
-                    "select person_id, writetime(person_name) from beam_ks.scientist where person_id=10 AND person_department='logic'")
+                .withQuery(query)
                 .withCoder(SerializableCoder.of(Scientist.class))
                 .withEntity(Scientist.class));
 
@@ -485,6 +489,39 @@ public class CassandraIOTest implements Serializable {
     pipeline.run();
   }
 
+  @Test
+  public void testReadWithUnfilteredQuery() throws Exception {
+    String query =
+        String.format(
+            "select person_id, writetime(person_name) from %s.%s",
+            CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
+
+    PCollection<Scientist> output =
+        pipeline.apply(
+            CassandraIO.<Scientist>read()
+                .withHosts(Collections.singletonList(CASSANDRA_HOST))
+                .withPort(cassandraPort)
+                .withKeyspace(CASSANDRA_KEYSPACE)
+                .withTable(CASSANDRA_TABLE)
+                .withMinNumberOfSplits(20)
+                .withQuery(query)
+                .withCoder(SerializableCoder.of(Scientist.class))
+                .withEntity(Scientist.class));
+
+    PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(NUM_ROWS);
+    PAssert.that(output)
+        .satisfies(
+            input -> {
+              for (Scientist sci : input) {
+                assertNull(sci.name);
+                assertTrue(sci.nameTs != null && sci.nameTs > 0);
+              }
+              return null;
+            });
+
+    pipeline.run();
+  }
+
   @Test
   public void testWrite() {
     ArrayList<ScientistWrite> data = new ArrayList<>();