You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/03/15 21:36:53 UTC
[beam] branch master updated: [BEAM-6773] Add ValueProvider to
CassandraIO.Read
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 24406c9 [BEAM-6773] Add ValueProvider to CassandraIO.Read
new 6e96e23 Merge pull request #8024: [BEAM-6773] Add ValueProvider to CassandraIO.Read
24406c9 is described below
commit 24406c9d3259a511646be661c687c73e3c2a531b
Author: Radoslaw Stankiewicz <ra...@google.com>
AuthorDate: Wed Mar 6 17:37:26 2019 +0100
[BEAM-6773] Add ValueProvider to CassandraIO.Read
Enable parametrization at runtime and enable future integrations with templates
---
.../apache/beam/sdk/io/cassandra/CassandraIO.java | 181 +++++++++++++++++----
.../beam/sdk/io/cassandra/CassandraIOTest.java | 20 +++
2 files changed, 167 insertions(+), 34 deletions(-)
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 06e6d1a..1e34a27 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -50,6 +50,7 @@ import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -131,16 +132,16 @@ public class CassandraIO {
@AutoValue
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
@Nullable
- abstract List<String> hosts();
+ abstract ValueProvider<List<String>> hosts();
@Nullable
- abstract Integer port();
+ abstract ValueProvider<Integer> port();
@Nullable
- abstract String keyspace();
+ abstract ValueProvider<String> keyspace();
@Nullable
- abstract String table();
+ abstract ValueProvider<String> table();
@Nullable
abstract Class<T> entity();
@@ -149,10 +150,10 @@ public class CassandraIO {
abstract Coder<T> coder();
@Nullable
- abstract String username();
+ abstract ValueProvider<String> username();
@Nullable
- abstract String password();
+ abstract ValueProvider<String> password();
@Nullable
abstract String encryptedPassword();
@@ -161,16 +162,16 @@ public class CassandraIO {
abstract PasswordDecrypter passwordDecrypter();
@Nullable
- abstract String localDc();
+ abstract ValueProvider<String> localDc();
@Nullable
- abstract String consistencyLevel();
+ abstract ValueProvider<String> consistencyLevel();
@Nullable
- abstract String where();
+ abstract ValueProvider<String> where();
@Nullable
- abstract Integer minNumberOfSplits();
+ abstract ValueProvider<Integer> minNumberOfSplits();
abstract Builder<T> builder();
@@ -178,24 +179,44 @@ public class CassandraIO {
public Read<T> withHosts(List<String> hosts) {
checkArgument(hosts != null, "hosts can not be null");
checkArgument(!hosts.isEmpty(), "hosts can not be empty");
+ return builder().setHosts(ValueProvider.StaticValueProvider.of(hosts)).build();
+ }
+
+ /** Specify the hosts of the Apache Cassandra instances. */
+ public Read<T> withHosts(ValueProvider<List<String>> hosts) {
return builder().setHosts(hosts).build();
}
/** Specify the port number of the Apache Cassandra instances. */
public Read<T> withPort(int port) {
checkArgument(port > 0, "port must be > 0, but was: %s", port);
+ return builder().setPort(ValueProvider.StaticValueProvider.of(port)).build();
+ }
+
+ /** Specify the port number of the Apache Cassandra instances. */
+ public Read<T> withPort(ValueProvider<Integer> port) {
return builder().setPort(port).build();
}
/** Specify the Cassandra keyspace where to read data. */
public Read<T> withKeyspace(String keyspace) {
checkArgument(keyspace != null, "keyspace can not be null");
+ return builder().setKeyspace(ValueProvider.StaticValueProvider.of(keyspace)).build();
+ }
+
+ /** Specify the Cassandra keyspace where to read data. */
+ public Read<T> withKeyspace(ValueProvider<String> keyspace) {
return builder().setKeyspace(keyspace).build();
}
/** Specify the Cassandra table where to read data. */
public Read<T> withTable(String table) {
checkArgument(table != null, "table can not be null");
+ return builder().setTable(ValueProvider.StaticValueProvider.of(table)).build();
+ }
+
+ /** Specify the Cassandra table where to read data. */
+ public Read<T> withTable(ValueProvider<String> table) {
return builder().setTable(table).build();
}
@@ -218,12 +239,22 @@ public class CassandraIO {
/** Specify the username for authentication. */
public Read<T> withUsername(String username) {
checkArgument(username != null, "username can not be null");
+ return builder().setUsername(ValueProvider.StaticValueProvider.of(username)).build();
+ }
+
+ /** Specify the username for authentication. */
+ public Read<T> withUsername(ValueProvider<String> username) {
return builder().setUsername(username).build();
}
/** Specify the clear password used for authentication. */
public Read<T> withPassword(String password) {
checkArgument(password != null, "password can not be null");
+ return builder().setPassword(ValueProvider.StaticValueProvider.of(password)).build();
+ }
+
+ /** Specify the clear password for authentication. */
+ public Read<T> withPassword(ValueProvider<String> password) {
return builder().setPassword(password).build();
}
@@ -249,11 +280,22 @@ public class CassandraIO {
/** Specify the local DC used for the load balancing. */
public Read<T> withLocalDc(String localDc) {
checkArgument(localDc != null, "localDc can not be null");
+ return builder().setLocalDc(ValueProvider.StaticValueProvider.of(localDc)).build();
+ }
+
+ /** Specify the local DC used for the load balancing. */
+ public Read<T> withLocalDc(ValueProvider<String> localDc) {
return builder().setLocalDc(localDc).build();
}
public Read<T> withConsistencyLevel(String consistencyLevel) {
checkArgument(consistencyLevel != null, "consistencyLevel can not be null");
+ return builder()
+ .setConsistencyLevel(ValueProvider.StaticValueProvider.of(consistencyLevel))
+ .build();
+ }
+
+ public Read<T> withConsistencyLevel(ValueProvider<String> consistencyLevel) {
return builder().setConsistencyLevel(consistencyLevel).build();
}
@@ -272,6 +314,23 @@ public class CassandraIO {
*/
public Read<T> withWhere(String where) {
checkArgument(where != null, "where can not be null");
+ return builder().setWhere(ValueProvider.StaticValueProvider.of(where)).build();
+ }
+
+ /**
+ * Specify a string with a partial {@code Where} clause. Note: Cassandra places restrictions on
+ * the {@code Where} clause you may use. (e.g. filter on a primary/clustering column only etc.)
+ *
+ * @param where Partial {@code Where} clause. Optional - If unspecified will not filter the
+ * data.
+ * @see <a href="http://cassandra.apache.org/doc/4.0/cql/dml.html#where-clause">CQL
+ * Documentation</a>
+ * @throws com.datastax.driver.core.exceptions.InvalidQueryException If {@code Where} clause
+ * makes the generated query invalid. Please Consult <a
+ * href="http://cassandra.apache.org/doc/4.0/cql/dml.html#where-clause">CQL
+ * Documentation</a> for more info on correct usage of the {@code Where} clause.
+ */
+ public Read<T> withWhere(ValueProvider<String> where) {
return builder().setWhere(where).build();
}
@@ -283,6 +342,17 @@ public class CassandraIO {
public Read<T> withMinNumberOfSplits(Integer minNumberOfSplits) {
checkArgument(minNumberOfSplits != null, "minNumberOfSplits can not be null");
checkArgument(minNumberOfSplits > 0, "minNumberOfSplits must be greater than 0");
+ return builder()
+ .setMinNumberOfSplits(ValueProvider.StaticValueProvider.of(minNumberOfSplits))
+ .build();
+ }
+
+ /**
+ * It's possible that system.size_estimates isn't populated or that the number of splits
+ * computed by Beam is still to low for Cassandra to handle it. This setting allows to enforce a
+ * minimum number of splits in case Beam cannot compute it correctly.
+ */
+ public Read<T> withMinNumberOfSplits(ValueProvider<Integer> minNumberOfSplits) {
return builder().setMinNumberOfSplits(minNumberOfSplits).build();
}
@@ -299,33 +369,33 @@ public class CassandraIO {
@AutoValue.Builder
abstract static class Builder<T> {
- abstract Builder<T> setHosts(List<String> hosts);
+ abstract Builder<T> setHosts(ValueProvider<List<String>> hosts);
- abstract Builder<T> setPort(Integer port);
+ abstract Builder<T> setPort(ValueProvider<Integer> port);
- abstract Builder<T> setKeyspace(String keyspace);
+ abstract Builder<T> setKeyspace(ValueProvider<String> keyspace);
- abstract Builder<T> setTable(String table);
+ abstract Builder<T> setTable(ValueProvider<String> table);
abstract Builder<T> setEntity(Class<T> entity);
abstract Builder<T> setCoder(Coder<T> coder);
- abstract Builder<T> setUsername(String username);
+ abstract Builder<T> setUsername(ValueProvider<String> username);
- abstract Builder<T> setPassword(String password);
+ abstract Builder<T> setPassword(ValueProvider<String> password);
abstract Builder<T> setEncryptedPassword(String encryptedPassword);
abstract Builder<T> setPasswordDecrypter(PasswordDecrypter passwordDecrypter);
- abstract Builder<T> setLocalDc(String localDc);
+ abstract Builder<T> setLocalDc(ValueProvider<String> localDc);
- abstract Builder<T> setConsistencyLevel(String consistencyLevel);
+ abstract Builder<T> setConsistencyLevel(ValueProvider<String> consistencyLevel);
- abstract Builder<T> setWhere(String where);
+ abstract Builder<T> setWhere(ValueProvider<String> where);
- abstract Builder<T> setMinNumberOfSplits(Integer minNumberOfSplits);
+ abstract Builder<T> setMinNumberOfSplits(ValueProvider<Integer> minNumberOfSplits);
abstract Read<T> build();
}
@@ -376,9 +446,9 @@ public class CassandraIO {
String splitQuery =
String.format(
"SELECT * FROM %s.%s%s;",
- spec.keyspace(),
- spec.table(),
- spec.where() == null ? "" : String.format(" WHERE %s", spec.where()));
+ spec.keyspace().get(),
+ spec.table().get(),
+ spec.where() != null ? "" : String.format(" WHERE %s", spec.where().get()));
return Collections.singletonList(
new CassandraIO.CassandraSource<>(spec, Collections.singletonList(splitQuery)));
}
@@ -407,7 +477,7 @@ public class CassandraIO {
LOG.info("{} splits were actually generated", splits.size());
final String partitionKey =
- cluster.getMetadata().getKeyspace(spec.keyspace()).getTable(spec.table())
+ cluster.getMetadata().getKeyspace(spec.keyspace().get()).getTable(spec.table().get())
.getPartitionKey().stream()
.map(ColumnMetadata::getName)
.collect(Collectors.joining(","));
@@ -457,21 +527,21 @@ public class CassandraIO {
}
private static String generateRangeQuery(
- String keyspace,
- String table,
- String where,
+ ValueProvider<String> keyspace,
+ ValueProvider<String> table,
+ ValueProvider<String> where,
String partitionKey,
BigInteger rangeStart,
BigInteger rangeEnd) {
String query =
String.format(
"SELECT * FROM %s.%s WHERE %s;",
- keyspace,
- table,
+ keyspace.get(),
+ table.get(),
Joiner.on(" AND ")
.skipNulls()
.join(
- where == null ? null : String.format("(%s)", where),
+ where == null ? null : String.format("(%s)", where.get()),
rangeStart == null
? null
: String.format("(token(%s)>=%d)", partitionKey, rangeStart),
@@ -483,14 +553,16 @@ public class CassandraIO {
}
private static long getNumSplits(
- long desiredBundleSizeBytes, long estimatedSizeBytes, @Nullable Integer minNumberOfSplits) {
+ long desiredBundleSizeBytes,
+ long estimatedSizeBytes,
+ @Nullable ValueProvider<Integer> minNumberOfSplits) {
long numSplits =
desiredBundleSizeBytes > 0 ? (estimatedSizeBytes / desiredBundleSizeBytes) : 1;
if (numSplits <= 0) {
LOG.warn("Number of splits is less than 0 ({}), fallback to 1", numSplits);
numSplits = 1;
}
- return minNumberOfSplits != null ? Math.max(numSplits, minNumberOfSplits) : numSplits;
+ return minNumberOfSplits != null ? Math.max(numSplits, minNumberOfSplits.get()) : numSplits;
}
@Override
@@ -507,7 +579,8 @@ public class CassandraIO {
spec.consistencyLevel())) {
if (isMurmur3Partitioner(cluster)) {
try {
- List<TokenRange> tokenRanges = getTokenRanges(cluster, spec.keyspace(), spec.table());
+ List<TokenRange> tokenRanges =
+ getTokenRanges(cluster, spec.keyspace().get(), spec.table().get());
return getEstimatedSizeBytesFromTokenRanges(tokenRanges);
} catch (Exception e) {
LOG.warn("Can't estimate the size", e);
@@ -656,7 +729,7 @@ public class CassandraIO {
source.spec.passwordDecrypter(),
source.spec.localDc(),
source.spec.consistencyLevel());
- session = cluster.connect(source.spec.keyspace());
+ session = cluster.connect(source.spec.keyspace().get());
LOG.debug("Queries: " + source.splitQueries);
List<ResultSetFuture> futures = new ArrayList<>();
for (String query : source.splitQueries) {
@@ -997,6 +1070,7 @@ public class CassandraIO {
deleter = null;
}
}
+
/** Get a Cassandra cluster using hosts and port. */
private static Cluster getCluster(
List<String> hosts,
@@ -1032,6 +1106,45 @@ public class CassandraIO {
return builder.build();
}
+ private static Cluster getCluster(
+ ValueProvider<List<String>> hosts,
+ ValueProvider<Integer> port,
+ ValueProvider<String> username,
+ ValueProvider<String> password,
+ String encryptedPassword,
+ PasswordDecrypter passwordDecrypter,
+ ValueProvider<String> localDc,
+ ValueProvider<String> consistencyLevel) {
+ Cluster.Builder builder =
+ Cluster.builder()
+ .addContactPoints(hosts.get().toArray(new String[hosts.get().size()]))
+ .withPort(port.get());
+
+ if (username != null) {
+ String passwordStr = null;
+ if (encryptedPassword != null && passwordDecrypter != null) {
+ passwordStr = passwordDecrypter.decrypt(encryptedPassword);
+ } else {
+ passwordStr = password.get();
+ }
+ builder.withAuthProvider(new PlainTextAuthProvider(username.get(), passwordStr));
+ }
+
+ DCAwareRoundRobinPolicy.Builder dcAwarePolicyBuilder = new DCAwareRoundRobinPolicy.Builder();
+ if (localDc != null) {
+ dcAwarePolicyBuilder.withLocalDc(localDc.get());
+ }
+
+ builder.withLoadBalancingPolicy(new TokenAwarePolicy(dcAwarePolicyBuilder.build()));
+
+ if (consistencyLevel != null) {
+ builder.withQueryOptions(
+ new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel.get())));
+ }
+
+ return builder.build();
+ }
+
/** Mutator allowing to do side effects into Apache Cassandra database. */
private static class Mutator<T> {
/**
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
index df82ac6..b7124f1 100644
--- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
@@ -271,6 +271,26 @@ public class CassandraIOTest implements Serializable {
}
@Test
+ public void testReadWithValueProvider() throws Exception {
+ insertRecords();
+
+ PCollection<Scientist> output =
+ pipeline.apply(
+ CassandraIO.<Scientist>read()
+ .withHosts(pipeline.newProvider(Arrays.asList(CASSANDRA_HOST)))
+ .withPort(pipeline.newProvider(CASSANDRA_PORT))
+ .withKeyspace(pipeline.newProvider(CASSANDRA_KEYSPACE))
+ .withTable(pipeline.newProvider(CASSANDRA_TABLE))
+ .withCoder(SerializableCoder.of(Scientist.class))
+ .withEntity(Scientist.class)
+ .withWhere(pipeline.newProvider("person_id=10")));
+
+ PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(1L);
+
+ pipeline.run();
+ }
+
+ @Test
public void testWrite() {
ArrayList<Scientist> scientists = buildScientists(NUM_ROWS);