You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/27 21:41:17 UTC
[beam] branch master updated: [BEAM-6324] Remove withWhere from
CassandraIO (superceded by withQuery)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e79a1e5 [BEAM-6324] Remove withWhere from CassandraIO (superceded by withQuery)
new 7928370 Merge pull request #8153 from iemejia/BEAM-6324-remove-wit-where
e79a1e5 is described below
commit e79a1e53f1a0bde633c521f5fb6c471154aea6f3
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Wed Mar 27 17:10:48 2019 +0100
[BEAM-6324] Remove withWhere from CassandraIO (superceded by withQuery)
---
.../apache/beam/sdk/io/cassandra/CassandraIO.java | 48 +---------------------
.../beam/sdk/io/cassandra/CassandraIOIT.java | 24 -----------
.../beam/sdk/io/cassandra/CassandraIOTest.java | 40 ------------------
3 files changed, 2 insertions(+), 110 deletions(-)
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 33af43e..b92cfab 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -165,9 +165,6 @@ public class CassandraIO {
abstract ValueProvider<String> consistencyLevel();
@Nullable
- abstract ValueProvider<String> where();
-
- @Nullable
abstract ValueProvider<Integer> minNumberOfSplits();
@Nullable
@@ -290,41 +287,6 @@ public class CassandraIO {
}
/**
- * Specify a string with a partial {@code Where} clause. Note: Cassandra places restrictions on
- * the {@code Where} clause you may use. (e.g. filter on a primary/clustering column only etc.)
- *
- * @param where Partial {@code Where} clause. Optional - If unspecified will not filter the
- * data.
- * @see <a href="http://cassandra.apache.org/doc/4.0/cql/dml.html#where-clause">CQL
- * Documentation</a>
- * @throws com.datastax.driver.core.exceptions.InvalidQueryException If {@code Where} clause
- * makes the generated query invalid. Please Consult <a
- * href="http://cassandra.apache.org/doc/4.0/cql/dml.html#where-clause">CQL
- * Documentation</a> for more info on correct usage of the {@code Where} clause.
- */
- public Read<T> withWhere(String where) {
- checkArgument(where != null, "where can not be null");
- return withWhere(ValueProvider.StaticValueProvider.of(where));
- }
-
- /**
- * Specify a string with a partial {@code Where} clause. Note: Cassandra places restrictions on
- * the {@code Where} clause you may use. (e.g. filter on a primary/clustering column only etc.)
- *
- * @param where Partial {@code Where} clause. Optional - If unspecified will not filter the
- * data.
- * @see <a href="http://cassandra.apache.org/doc/4.0/cql/dml.html#where-clause">CQL
- * Documentation</a>
- * @throws com.datastax.driver.core.exceptions.InvalidQueryException If {@code Where} clause
- * makes the generated query invalid. Please Consult <a
- * href="http://cassandra.apache.org/doc/4.0/cql/dml.html#where-clause">CQL
- * Documentation</a> for more info on correct usage of the {@code Where} clause.
- */
- public Read<T> withWhere(ValueProvider<String> where) {
- return builder().setWhere(where).build();
- }
-
- /**
* It's possible that system.size_estimates isn't populated or that the number of splits
* computed by Beam is still to low for Cassandra to handle it. This setting allows to enforce a
* minimum number of splits in case Beam cannot compute it correctly.
@@ -392,8 +354,6 @@ public class CassandraIO {
abstract Builder<T> setConsistencyLevel(ValueProvider<String> consistencyLevel);
- abstract Builder<T> setWhere(ValueProvider<String> where);
-
abstract Builder<T> setMinNumberOfSplits(ValueProvider<Integer> minNumberOfSplits);
abstract Builder<T> setMapperFactoryFn(SerializableFunction<Session, Mapper> mapperFactoryFn);
@@ -459,11 +419,7 @@ public class CassandraIO {
private static String buildQuery(Read spec) {
return (spec.query() == null)
- ? String.format(
- "SELECT * FROM %s.%s%s",
- spec.keyspace().get(),
- spec.table().get(),
- (spec.where() == null) ? "" : " WHERE (" + spec.where().get() + ")")
+ ? String.format("SELECT * FROM %s.%s", spec.keyspace().get(), spec.table().get())
: spec.query().get().toString();
}
@@ -530,7 +486,7 @@ public class CassandraIO {
? null
: String.format("(token(%s) < %d)", partitionKey, rangeEnd));
final String query =
- (spec.query() == null && spec.where() == null)
+ (spec.query() == null)
? buildQuery(spec) + " WHERE " + rangeFilter
: buildQuery(spec) + " AND " + rangeFilter;
LOG.debug("CassandraIO generated query : {}", query);
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
index 28abf3f..1f93676 100644
--- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
@@ -106,12 +105,6 @@ public class CassandraIOIT implements Serializable {
runRead();
}
- @Test
- public void testWriteThenReadWithWhere() {
- runWrite();
- runReadWithWhere();
- }
-
private void runWrite() {
pipelineWrite
.apply("GenSequence", GenerateSequence.from(0).to((long) options.getNumberOfRecords()))
@@ -151,23 +144,6 @@ public class CassandraIOIT implements Serializable {
pipelineRead.run().waitUntilFinish();
}
- private void runReadWithWhere() {
- PCollection<Scientist> output =
- pipelineRead.apply(
- CassandraIO.<Scientist>read()
- .withHosts(options.getCassandraHost())
- .withPort(options.getCassandraPort())
- .withKeyspace(KEYSPACE)
- .withTable(TABLE)
- .withEntity(Scientist.class)
- .withCoder(SerializableCoder.of(Scientist.class))
- .withWhere("id=100"));
-
- PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(1L);
-
- pipelineRead.run().waitUntilFinish();
- }
-
private static Cluster getCluster(CassandraIOITOptions options) {
return Cluster.builder()
.addContactPoints(options.getCassandraHost().toArray(new String[0]))
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
index a5870e2..bce119a 100644
--- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
@@ -256,26 +256,6 @@ public class CassandraIOTest implements Serializable {
}
@Test
- public void testReadWithWhere() throws Exception {
- insertRecords();
-
- PCollection<Scientist> output =
- pipeline.apply(
- CassandraIO.<Scientist>read()
- .withHosts(Collections.singletonList(CASSANDRA_HOST))
- .withPort(CASSANDRA_PORT)
- .withKeyspace(CASSANDRA_KEYSPACE)
- .withTable(CASSANDRA_TABLE)
- .withCoder(SerializableCoder.of(Scientist.class))
- .withEntity(Scientist.class)
- .withWhere("person_id = 10"));
-
- PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(1L);
-
- pipeline.run();
- }
-
- @Test
public void testReadWithQuery() throws Exception {
insertRecords();
@@ -306,26 +286,6 @@ public class CassandraIOTest implements Serializable {
}
@Test
- public void testReadWithValueProvider() throws Exception {
- insertRecords();
-
- PCollection<Scientist> output =
- pipeline.apply(
- CassandraIO.<Scientist>read()
- .withHosts(pipeline.newProvider(Collections.singletonList(CASSANDRA_HOST)))
- .withPort(pipeline.newProvider(CASSANDRA_PORT))
- .withKeyspace(pipeline.newProvider(CASSANDRA_KEYSPACE))
- .withTable(pipeline.newProvider(CASSANDRA_TABLE))
- .withCoder(SerializableCoder.of(Scientist.class))
- .withEntity(Scientist.class)
- .withWhere(pipeline.newProvider("person_id = 10")));
-
- PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(1L);
-
- pipeline.run();
- }
-
- @Test
public void testWrite() {
ArrayList<Scientist> data = new ArrayList<>();
for (int i = 0; i < NUM_ROWS; i++) {