You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/03/15 21:36:53 UTC

[beam] branch master updated: [BEAM-6773] Add ValueProvider to CassandraIO.Read

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 24406c9  [BEAM-6773] Add ValueProvider to CassandraIO.Read
     new 6e96e23  Merge pull request #8024: [BEAM-6773] Add ValueProvider to CassandraIO.Read
24406c9 is described below

commit 24406c9d3259a511646be661c687c73e3c2a531b
Author: Radoslaw Stankiewicz <ra...@google.com>
AuthorDate: Wed Mar 6 17:37:26 2019 +0100

    [BEAM-6773] Add ValueProvider to CassandraIO.Read
    
    Enable parametrization at runtime and enable future integrations with templates
---
 .../apache/beam/sdk/io/cassandra/CassandraIO.java  | 181 +++++++++++++++++----
 .../beam/sdk/io/cassandra/CassandraIOTest.java     |  20 +++
 2 files changed, 167 insertions(+), 34 deletions(-)

diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 06e6d1a..1e34a27 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -50,6 +50,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -131,16 +132,16 @@ public class CassandraIO {
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
     @Nullable
-    abstract List<String> hosts();
+    abstract ValueProvider<List<String>> hosts();
 
     @Nullable
-    abstract Integer port();
+    abstract ValueProvider<Integer> port();
 
     @Nullable
-    abstract String keyspace();
+    abstract ValueProvider<String> keyspace();
 
     @Nullable
-    abstract String table();
+    abstract ValueProvider<String> table();
 
     @Nullable
     abstract Class<T> entity();
@@ -149,10 +150,10 @@ public class CassandraIO {
     abstract Coder<T> coder();
 
     @Nullable
-    abstract String username();
+    abstract ValueProvider<String> username();
 
     @Nullable
-    abstract String password();
+    abstract ValueProvider<String> password();
 
     @Nullable
     abstract String encryptedPassword();
@@ -161,16 +162,16 @@ public class CassandraIO {
     abstract PasswordDecrypter passwordDecrypter();
 
     @Nullable
-    abstract String localDc();
+    abstract ValueProvider<String> localDc();
 
     @Nullable
-    abstract String consistencyLevel();
+    abstract ValueProvider<String> consistencyLevel();
 
     @Nullable
-    abstract String where();
+    abstract ValueProvider<String> where();
 
     @Nullable
-    abstract Integer minNumberOfSplits();
+    abstract ValueProvider<Integer> minNumberOfSplits();
 
     abstract Builder<T> builder();
 
@@ -178,24 +179,44 @@ public class CassandraIO {
     public Read<T> withHosts(List<String> hosts) {
       checkArgument(hosts != null, "hosts can not be null");
       checkArgument(!hosts.isEmpty(), "hosts can not be empty");
+      return builder().setHosts(ValueProvider.StaticValueProvider.of(hosts)).build();
+    }
+
+    /** Specify the hosts of the Apache Cassandra instances. */
+    public Read<T> withHosts(ValueProvider<List<String>> hosts) {
       return builder().setHosts(hosts).build();
     }
 
     /** Specify the port number of the Apache Cassandra instances. */
     public Read<T> withPort(int port) {
       checkArgument(port > 0, "port must be > 0, but was: %s", port);
+      return builder().setPort(ValueProvider.StaticValueProvider.of(port)).build();
+    }
+
+    /** Specify the port number of the Apache Cassandra instances. */
+    public Read<T> withPort(ValueProvider<Integer> port) {
       return builder().setPort(port).build();
     }
 
     /** Specify the Cassandra keyspace where to read data. */
     public Read<T> withKeyspace(String keyspace) {
       checkArgument(keyspace != null, "keyspace can not be null");
+      return builder().setKeyspace(ValueProvider.StaticValueProvider.of(keyspace)).build();
+    }
+
+    /** Specify the Cassandra keyspace where to read data. */
+    public Read<T> withKeyspace(ValueProvider<String> keyspace) {
       return builder().setKeyspace(keyspace).build();
     }
 
     /** Specify the Cassandra table where to read data. */
     public Read<T> withTable(String table) {
       checkArgument(table != null, "table can not be null");
+      return builder().setTable(ValueProvider.StaticValueProvider.of(table)).build();
+    }
+
+    /** Specify the Cassandra table where to read data. */
+    public Read<T> withTable(ValueProvider<String> table) {
       return builder().setTable(table).build();
     }
 
@@ -218,12 +239,22 @@ public class CassandraIO {
     /** Specify the username for authentication. */
     public Read<T> withUsername(String username) {
       checkArgument(username != null, "username can not be null");
+      return builder().setUsername(ValueProvider.StaticValueProvider.of(username)).build();
+    }
+
+    /** Specify the username for authentication. */
+    public Read<T> withUsername(ValueProvider<String> username) {
       return builder().setUsername(username).build();
     }
 
     /** Specify the clear password used for authentication. */
     public Read<T> withPassword(String password) {
       checkArgument(password != null, "password can not be null");
+      return builder().setPassword(ValueProvider.StaticValueProvider.of(password)).build();
+    }
+
+    /** Specify the clear password for authentication. */
+    public Read<T> withPassword(ValueProvider<String> password) {
       return builder().setPassword(password).build();
     }
 
@@ -249,11 +280,22 @@ public class CassandraIO {
     /** Specify the local DC used for the load balancing. */
     public Read<T> withLocalDc(String localDc) {
       checkArgument(localDc != null, "localDc can not be null");
+      return builder().setLocalDc(ValueProvider.StaticValueProvider.of(localDc)).build();
+    }
+
+    /** Specify the local DC used for the load balancing. */
+    public Read<T> withLocalDc(ValueProvider<String> localDc) {
       return builder().setLocalDc(localDc).build();
     }
 
     public Read<T> withConsistencyLevel(String consistencyLevel) {
       checkArgument(consistencyLevel != null, "consistencyLevel can not be null");
+      return builder()
+          .setConsistencyLevel(ValueProvider.StaticValueProvider.of(consistencyLevel))
+          .build();
+    }
+
+    public Read<T> withConsistencyLevel(ValueProvider<String> consistencyLevel) {
       return builder().setConsistencyLevel(consistencyLevel).build();
     }
 
@@ -272,6 +314,23 @@ public class CassandraIO {
      */
     public Read<T> withWhere(String where) {
       checkArgument(where != null, "where can not be null");
+      return builder().setWhere(ValueProvider.StaticValueProvider.of(where)).build();
+    }
+
+    /**
+     * Specify a string with a partial {@code Where} clause. Note: Cassandra places restrictions on
+     * the {@code Where} clause you may use. (e.g. filter on a primary/clustering column only etc.)
+     *
+     * @param where Partial {@code Where} clause. Optional - If unspecified will not filter the
+     *     data.
+     * @see <a href="http://cassandra.apache.org/doc/4.0/cql/dml.html#where-clause">CQL
+     *     Documentation</a>
+     * @throws com.datastax.driver.core.exceptions.InvalidQueryException If {@code Where} clause
+     *     makes the generated query invalid. Please Consult <a
+     *     href="http://cassandra.apache.org/doc/4.0/cql/dml.html#where-clause">CQL
+     *     Documentation</a> for more info on correct usage of the {@code Where} clause.
+     */
+    public Read<T> withWhere(ValueProvider<String> where) {
       return builder().setWhere(where).build();
     }
 
@@ -283,6 +342,17 @@ public class CassandraIO {
     public Read<T> withMinNumberOfSplits(Integer minNumberOfSplits) {
       checkArgument(minNumberOfSplits != null, "minNumberOfSplits can not be null");
       checkArgument(minNumberOfSplits > 0, "minNumberOfSplits must be greater than 0");
+      return builder()
+          .setMinNumberOfSplits(ValueProvider.StaticValueProvider.of(minNumberOfSplits))
+          .build();
+    }
+
+    /**
+     * It's possible that system.size_estimates isn't populated or that the number of splits
+     * computed by Beam is still to low for Cassandra to handle it. This setting allows to enforce a
+     * minimum number of splits in case Beam cannot compute it correctly.
+     */
+    public Read<T> withMinNumberOfSplits(ValueProvider<Integer> minNumberOfSplits) {
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
@@ -299,33 +369,33 @@ public class CassandraIO {
 
     @AutoValue.Builder
     abstract static class Builder<T> {
-      abstract Builder<T> setHosts(List<String> hosts);
+      abstract Builder<T> setHosts(ValueProvider<List<String>> hosts);
 
-      abstract Builder<T> setPort(Integer port);
+      abstract Builder<T> setPort(ValueProvider<Integer> port);
 
-      abstract Builder<T> setKeyspace(String keyspace);
+      abstract Builder<T> setKeyspace(ValueProvider<String> keyspace);
 
-      abstract Builder<T> setTable(String table);
+      abstract Builder<T> setTable(ValueProvider<String> table);
 
       abstract Builder<T> setEntity(Class<T> entity);
 
       abstract Builder<T> setCoder(Coder<T> coder);
 
-      abstract Builder<T> setUsername(String username);
+      abstract Builder<T> setUsername(ValueProvider<String> username);
 
-      abstract Builder<T> setPassword(String password);
+      abstract Builder<T> setPassword(ValueProvider<String> password);
 
       abstract Builder<T> setEncryptedPassword(String encryptedPassword);
 
       abstract Builder<T> setPasswordDecrypter(PasswordDecrypter passwordDecrypter);
 
-      abstract Builder<T> setLocalDc(String localDc);
+      abstract Builder<T> setLocalDc(ValueProvider<String> localDc);
 
-      abstract Builder<T> setConsistencyLevel(String consistencyLevel);
+      abstract Builder<T> setConsistencyLevel(ValueProvider<String> consistencyLevel);
 
-      abstract Builder<T> setWhere(String where);
+      abstract Builder<T> setWhere(ValueProvider<String> where);
 
-      abstract Builder<T> setMinNumberOfSplits(Integer minNumberOfSplits);
+      abstract Builder<T> setMinNumberOfSplits(ValueProvider<Integer> minNumberOfSplits);
 
       abstract Read<T> build();
     }
@@ -376,9 +446,9 @@ public class CassandraIO {
           String splitQuery =
               String.format(
                   "SELECT * FROM %s.%s%s;",
-                  spec.keyspace(),
-                  spec.table(),
-                  spec.where() == null ? "" : String.format(" WHERE %s", spec.where()));
+                  spec.keyspace().get(),
+                  spec.table().get(),
+                  spec.where() != null ? "" : String.format(" WHERE %s", spec.where().get()));
           return Collections.singletonList(
               new CassandraIO.CassandraSource<>(spec, Collections.singletonList(splitQuery)));
         }
@@ -407,7 +477,7 @@ public class CassandraIO {
       LOG.info("{} splits were actually generated", splits.size());
 
       final String partitionKey =
-          cluster.getMetadata().getKeyspace(spec.keyspace()).getTable(spec.table())
+          cluster.getMetadata().getKeyspace(spec.keyspace().get()).getTable(spec.table().get())
               .getPartitionKey().stream()
               .map(ColumnMetadata::getName)
               .collect(Collectors.joining(","));
@@ -457,21 +527,21 @@ public class CassandraIO {
     }
 
     private static String generateRangeQuery(
-        String keyspace,
-        String table,
-        String where,
+        ValueProvider<String> keyspace,
+        ValueProvider<String> table,
+        ValueProvider<String> where,
         String partitionKey,
         BigInteger rangeStart,
         BigInteger rangeEnd) {
       String query =
           String.format(
               "SELECT * FROM %s.%s WHERE %s;",
-              keyspace,
-              table,
+              keyspace.get(),
+              table.get(),
               Joiner.on(" AND ")
                   .skipNulls()
                   .join(
-                      where == null ? null : String.format("(%s)", where),
+                      where == null ? null : String.format("(%s)", where.get()),
                       rangeStart == null
                           ? null
                           : String.format("(token(%s)>=%d)", partitionKey, rangeStart),
@@ -483,14 +553,16 @@ public class CassandraIO {
     }
 
     private static long getNumSplits(
-        long desiredBundleSizeBytes, long estimatedSizeBytes, @Nullable Integer minNumberOfSplits) {
+        long desiredBundleSizeBytes,
+        long estimatedSizeBytes,
+        @Nullable ValueProvider<Integer> minNumberOfSplits) {
       long numSplits =
           desiredBundleSizeBytes > 0 ? (estimatedSizeBytes / desiredBundleSizeBytes) : 1;
       if (numSplits <= 0) {
         LOG.warn("Number of splits is less than 0 ({}), fallback to 1", numSplits);
         numSplits = 1;
       }
-      return minNumberOfSplits != null ? Math.max(numSplits, minNumberOfSplits) : numSplits;
+      return minNumberOfSplits != null ? Math.max(numSplits, minNumberOfSplits.get()) : numSplits;
     }
 
     @Override
@@ -507,7 +579,8 @@ public class CassandraIO {
               spec.consistencyLevel())) {
         if (isMurmur3Partitioner(cluster)) {
           try {
-            List<TokenRange> tokenRanges = getTokenRanges(cluster, spec.keyspace(), spec.table());
+            List<TokenRange> tokenRanges =
+                getTokenRanges(cluster, spec.keyspace().get(), spec.table().get());
             return getEstimatedSizeBytesFromTokenRanges(tokenRanges);
           } catch (Exception e) {
             LOG.warn("Can't estimate the size", e);
@@ -656,7 +729,7 @@ public class CassandraIO {
                 source.spec.passwordDecrypter(),
                 source.spec.localDc(),
                 source.spec.consistencyLevel());
-        session = cluster.connect(source.spec.keyspace());
+        session = cluster.connect(source.spec.keyspace().get());
         LOG.debug("Queries: " + source.splitQueries);
         List<ResultSetFuture> futures = new ArrayList<>();
         for (String query : source.splitQueries) {
@@ -997,6 +1070,7 @@ public class CassandraIO {
       deleter = null;
     }
   }
+
   /** Get a Cassandra cluster using hosts and port. */
   private static Cluster getCluster(
       List<String> hosts,
@@ -1032,6 +1106,45 @@ public class CassandraIO {
     return builder.build();
   }
 
+  private static Cluster getCluster(
+      ValueProvider<List<String>> hosts,
+      ValueProvider<Integer> port,
+      ValueProvider<String> username,
+      ValueProvider<String> password,
+      String encryptedPassword,
+      PasswordDecrypter passwordDecrypter,
+      ValueProvider<String> localDc,
+      ValueProvider<String> consistencyLevel) {
+    Cluster.Builder builder =
+        Cluster.builder()
+            .addContactPoints(hosts.get().toArray(new String[hosts.get().size()]))
+            .withPort(port.get());
+
+    if (username != null) {
+      String passwordStr = null;
+      if (encryptedPassword != null && passwordDecrypter != null) {
+        passwordStr = passwordDecrypter.decrypt(encryptedPassword);
+      } else {
+        passwordStr = password.get();
+      }
+      builder.withAuthProvider(new PlainTextAuthProvider(username.get(), passwordStr));
+    }
+
+    DCAwareRoundRobinPolicy.Builder dcAwarePolicyBuilder = new DCAwareRoundRobinPolicy.Builder();
+    if (localDc != null) {
+      dcAwarePolicyBuilder.withLocalDc(localDc.get());
+    }
+
+    builder.withLoadBalancingPolicy(new TokenAwarePolicy(dcAwarePolicyBuilder.build()));
+
+    if (consistencyLevel != null) {
+      builder.withQueryOptions(
+          new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel.get())));
+    }
+
+    return builder.build();
+  }
+
   /** Mutator allowing to do side effects into Apache Cassandra database. */
   private static class Mutator<T> {
     /**
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
index df82ac6..b7124f1 100644
--- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
@@ -271,6 +271,26 @@ public class CassandraIOTest implements Serializable {
   }
 
   @Test
+  public void testReadWithValueProvider() throws Exception {
+    insertRecords();
+
+    PCollection<Scientist> output =
+        pipeline.apply(
+            CassandraIO.<Scientist>read()
+                .withHosts(pipeline.newProvider(Arrays.asList(CASSANDRA_HOST)))
+                .withPort(pipeline.newProvider(CASSANDRA_PORT))
+                .withKeyspace(pipeline.newProvider(CASSANDRA_KEYSPACE))
+                .withTable(pipeline.newProvider(CASSANDRA_TABLE))
+                .withCoder(SerializableCoder.of(Scientist.class))
+                .withEntity(Scientist.class)
+                .withWhere(pipeline.newProvider("person_id=10")));
+
+    PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(1L);
+
+    pipeline.run();
+  }
+
+  @Test
   public void testWrite() {
     ArrayList<Scientist> scientists = buildScientists(NUM_ROWS);