You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/27 21:41:17 UTC

[beam] branch master updated: [BEAM-6324] Remove withWhere from CassandraIO (superceded by withQuery)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e79a1e5  [BEAM-6324] Remove withWhere from CassandraIO (superceded by withQuery)
     new 7928370  Merge pull request #8153 from iemejia/BEAM-6324-remove-wit-where
e79a1e5 is described below

commit e79a1e53f1a0bde633c521f5fb6c471154aea6f3
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Wed Mar 27 17:10:48 2019 +0100

    [BEAM-6324] Remove withWhere from CassandraIO (superceded by withQuery)
---
 .../apache/beam/sdk/io/cassandra/CassandraIO.java  | 48 +---------------------
 .../beam/sdk/io/cassandra/CassandraIOIT.java       | 24 -----------
 .../beam/sdk/io/cassandra/CassandraIOTest.java     | 40 ------------------
 3 files changed, 2 insertions(+), 110 deletions(-)

diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 33af43e..b92cfab 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -165,9 +165,6 @@ public class CassandraIO {
     abstract ValueProvider<String> consistencyLevel();
 
     @Nullable
-    abstract ValueProvider<String> where();
-
-    @Nullable
     abstract ValueProvider<Integer> minNumberOfSplits();
 
     @Nullable
@@ -290,41 +287,6 @@ public class CassandraIO {
     }
 
     /**
-     * Specify a string with a partial {@code Where} clause. Note: Cassandra places restrictions on
-     * the {@code Where} clause you may use. (e.g. filter on a primary/clustering column only etc.)
-     *
-     * @param where Partial {@code Where} clause. Optional - If unspecified will not filter the
-     *     data.
-     * @see <a href="http://cassandra.apache.org/doc/4.0/cql/dml.html#where-clause">CQL
-     *     Documentation</a>
-     * @throws com.datastax.driver.core.exceptions.InvalidQueryException If {@code Where} clause
-     *     makes the generated query invalid. Please Consult <a
-     *     href="http://cassandra.apache.org/doc/4.0/cql/dml.html#where-clause">CQL
-     *     Documentation</a> for more info on correct usage of the {@code Where} clause.
-     */
-    public Read<T> withWhere(String where) {
-      checkArgument(where != null, "where can not be null");
-      return withWhere(ValueProvider.StaticValueProvider.of(where));
-    }
-
-    /**
-     * Specify a string with a partial {@code Where} clause. Note: Cassandra places restrictions on
-     * the {@code Where} clause you may use. (e.g. filter on a primary/clustering column only etc.)
-     *
-     * @param where Partial {@code Where} clause. Optional - If unspecified will not filter the
-     *     data.
-     * @see <a href="http://cassandra.apache.org/doc/4.0/cql/dml.html#where-clause">CQL
-     *     Documentation</a>
-     * @throws com.datastax.driver.core.exceptions.InvalidQueryException If {@code Where} clause
-     *     makes the generated query invalid. Please Consult <a
-     *     href="http://cassandra.apache.org/doc/4.0/cql/dml.html#where-clause">CQL
-     *     Documentation</a> for more info on correct usage of the {@code Where} clause.
-     */
-    public Read<T> withWhere(ValueProvider<String> where) {
-      return builder().setWhere(where).build();
-    }
-
-    /**
      * It's possible that system.size_estimates isn't populated or that the number of splits
      * computed by Beam is still to low for Cassandra to handle it. This setting allows to enforce a
      * minimum number of splits in case Beam cannot compute it correctly.
@@ -392,8 +354,6 @@ public class CassandraIO {
 
       abstract Builder<T> setConsistencyLevel(ValueProvider<String> consistencyLevel);
 
-      abstract Builder<T> setWhere(ValueProvider<String> where);
-
       abstract Builder<T> setMinNumberOfSplits(ValueProvider<Integer> minNumberOfSplits);
 
       abstract Builder<T> setMapperFactoryFn(SerializableFunction<Session, Mapper> mapperFactoryFn);
@@ -459,11 +419,7 @@ public class CassandraIO {
 
     private static String buildQuery(Read spec) {
       return (spec.query() == null)
-          ? String.format(
-              "SELECT * FROM %s.%s%s",
-              spec.keyspace().get(),
-              spec.table().get(),
-              (spec.where() == null) ? "" : " WHERE (" + spec.where().get() + ")")
+          ? String.format("SELECT * FROM %s.%s", spec.keyspace().get(), spec.table().get())
           : spec.query().get().toString();
     }
 
@@ -530,7 +486,7 @@ public class CassandraIO {
                       ? null
                       : String.format("(token(%s) < %d)", partitionKey, rangeEnd));
       final String query =
-          (spec.query() == null && spec.where() == null)
+          (spec.query() == null)
               ? buildQuery(spec) + " WHERE " + rangeFilter
               : buildQuery(spec) + " AND " + rangeFilter;
       LOG.debug("CassandraIO generated query : {}", query);
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
index 28abf3f..1f93676 100644
--- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
@@ -106,12 +105,6 @@ public class CassandraIOIT implements Serializable {
     runRead();
   }
 
-  @Test
-  public void testWriteThenReadWithWhere() {
-    runWrite();
-    runReadWithWhere();
-  }
-
   private void runWrite() {
     pipelineWrite
         .apply("GenSequence", GenerateSequence.from(0).to((long) options.getNumberOfRecords()))
@@ -151,23 +144,6 @@ public class CassandraIOIT implements Serializable {
     pipelineRead.run().waitUntilFinish();
   }
 
-  private void runReadWithWhere() {
-    PCollection<Scientist> output =
-        pipelineRead.apply(
-            CassandraIO.<Scientist>read()
-                .withHosts(options.getCassandraHost())
-                .withPort(options.getCassandraPort())
-                .withKeyspace(KEYSPACE)
-                .withTable(TABLE)
-                .withEntity(Scientist.class)
-                .withCoder(SerializableCoder.of(Scientist.class))
-                .withWhere("id=100"));
-
-    PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(1L);
-
-    pipelineRead.run().waitUntilFinish();
-  }
-
   private static Cluster getCluster(CassandraIOITOptions options) {
     return Cluster.builder()
         .addContactPoints(options.getCassandraHost().toArray(new String[0]))
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
index a5870e2..bce119a 100644
--- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
@@ -256,26 +256,6 @@ public class CassandraIOTest implements Serializable {
   }
 
   @Test
-  public void testReadWithWhere() throws Exception {
-    insertRecords();
-
-    PCollection<Scientist> output =
-        pipeline.apply(
-            CassandraIO.<Scientist>read()
-                .withHosts(Collections.singletonList(CASSANDRA_HOST))
-                .withPort(CASSANDRA_PORT)
-                .withKeyspace(CASSANDRA_KEYSPACE)
-                .withTable(CASSANDRA_TABLE)
-                .withCoder(SerializableCoder.of(Scientist.class))
-                .withEntity(Scientist.class)
-                .withWhere("person_id = 10"));
-
-    PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(1L);
-
-    pipeline.run();
-  }
-
-  @Test
   public void testReadWithQuery() throws Exception {
     insertRecords();
 
@@ -306,26 +286,6 @@ public class CassandraIOTest implements Serializable {
   }
 
   @Test
-  public void testReadWithValueProvider() throws Exception {
-    insertRecords();
-
-    PCollection<Scientist> output =
-        pipeline.apply(
-            CassandraIO.<Scientist>read()
-                .withHosts(pipeline.newProvider(Collections.singletonList(CASSANDRA_HOST)))
-                .withPort(pipeline.newProvider(CASSANDRA_PORT))
-                .withKeyspace(pipeline.newProvider(CASSANDRA_KEYSPACE))
-                .withTable(pipeline.newProvider(CASSANDRA_TABLE))
-                .withCoder(SerializableCoder.of(Scientist.class))
-                .withEntity(Scientist.class)
-                .withWhere(pipeline.newProvider("person_id = 10")));
-
-    PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(1L);
-
-    pipeline.run();
-  }
-
-  @Test
   public void testWrite() {
     ArrayList<Scientist> data = new ArrayList<>();
     for (int i = 0; i < NUM_ROWS; i++) {