You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2022/12/30 15:59:10 UTC
[beam] branch master updated: Fix Cassandra read bug when user query has no where clause (fixes #24829) (#24830)
This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ad72baceaef Fix Cassandra read bug when user query has no where clause (fixes #24829) (#24830)
ad72baceaef is described below
commit ad72baceaef2c3c0b44d0b9538689bd1eecc2d18
Author: Lucas Marques <34...@users.noreply.github.com>
AuthorDate: Fri Dec 30 12:59:03 2022 -0300
Fix Cassandra read bug when user query has no where clause (fixes #24829) (#24830)
* Fix Cassandra read bug when user query has no where clause
* update changelog
* fix format violation
* fix remaining format violation
* add unity test for read with unfiltered query
* fix unfiltered query unit test count validation
* use constant for num rows assertion on unfiltered query unit test
Co-authored-by: Yi Hu <hu...@gmail.com>
* fix cassandra query bugfix description
Co-authored-by: Yi Hu <hu...@gmail.com>
Co-authored-by: Lucas Marques <lu...@b2wdigital.com>
Co-authored-by: Yi Hu <hu...@gmail.com>
---
CHANGES.md | 2 +-
.../org/apache/beam/sdk/io/cassandra/ReadFn.java | 5 ++-
.../beam/sdk/io/cassandra/CassandraIOTest.java | 41 ++++++++++++++++++++--
3 files changed, 44 insertions(+), 4 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index c8d3eb97e88..e87cd3baf21 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -80,7 +80,7 @@
## Bugfixes
-* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
+* Avoids Cassandra syntax error when user-defined query has no where clause in it (Java) ([#24829](https://github.com/apache/beam/issues/24829)).
## Known Issues
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
index 6bca1cf3d17..3bb53602918 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
@@ -141,6 +141,9 @@ class ReadFn<T> extends DoFn<Read<T>, T> {
return (spec.query() == null)
? String.format("SELECT * FROM %s.%s", spec.keyspace().get(), spec.table().get())
+ " WHERE "
- : spec.query().get() + (hasRingRange ? " AND " : "");
+ : spec.query().get()
+ + (hasRingRange
+ ? spec.query().get().toUpperCase().contains("WHERE") ? " AND " : " WHERE "
+ : "");
}
}
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
index 7196556abc7..a472b9ee1c3 100644
--- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
@@ -458,6 +458,11 @@ public class CassandraIOTest implements Serializable {
@Test
public void testReadWithQuery() throws Exception {
+ String query =
+ String.format(
+ "select person_id, writetime(person_name) from %s.%s where person_id=10 AND person_department='logic'",
+ CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
+
PCollection<Scientist> output =
pipeline.apply(
CassandraIO.<Scientist>read()
@@ -466,8 +471,7 @@ public class CassandraIOTest implements Serializable {
.withKeyspace(CASSANDRA_KEYSPACE)
.withTable(CASSANDRA_TABLE)
.withMinNumberOfSplits(20)
- .withQuery(
- "select person_id, writetime(person_name) from beam_ks.scientist where person_id=10 AND person_department='logic'")
+ .withQuery(query)
.withCoder(SerializableCoder.of(Scientist.class))
.withEntity(Scientist.class));
@@ -485,6 +489,39 @@ public class CassandraIOTest implements Serializable {
pipeline.run();
}
+ @Test
+ public void testReadWithUnfilteredQuery() throws Exception {
+ String query =
+ String.format(
+ "select person_id, writetime(person_name) from %s.%s",
+ CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
+
+ PCollection<Scientist> output =
+ pipeline.apply(
+ CassandraIO.<Scientist>read()
+ .withHosts(Collections.singletonList(CASSANDRA_HOST))
+ .withPort(cassandraPort)
+ .withKeyspace(CASSANDRA_KEYSPACE)
+ .withTable(CASSANDRA_TABLE)
+ .withMinNumberOfSplits(20)
+ .withQuery(query)
+ .withCoder(SerializableCoder.of(Scientist.class))
+ .withEntity(Scientist.class));
+
+ PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(NUM_ROWS);
+ PAssert.that(output)
+ .satisfies(
+ input -> {
+ for (Scientist sci : input) {
+ assertNull(sci.name);
+ assertTrue(sci.nameTs != null && sci.nameTs > 0);
+ }
+ return null;
+ });
+
+ pipeline.run();
+ }
+
@Test
public void testWrite() {
ArrayList<ScientistWrite> data = new ArrayList<>();