You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2018/12/10 16:31:05 UTC
[beam] 01/03: [BEAM-6079] Add ability for CassandraIO to delete data
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8100f3204b76d4a891f75262123c23d1e22d3ca5
Author: Fabien Rousseau <fa...@happn.com>
AuthorDate: Fri Nov 16 14:28:26 2018 +0100
[BEAM-6079] Add ability for CassandraIO to delete data
---
.../apache/beam/sdk/io/cassandra/CassandraIO.java | 161 ++++++++++++++++-----
.../beam/sdk/io/cassandra/CassandraService.java | 14 +-
.../sdk/io/cassandra/CassandraServiceImpl.java | 74 +++++++---
.../beam/sdk/io/cassandra/CassandraIOTest.java | 45 +++++-
4 files changed, 241 insertions(+), 53 deletions(-)
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index db073d7..838dd82 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -88,9 +88,16 @@ public class CassandraIO {
.build();
}
- /** Provide a {@link Write} {@link PTransform} to write data to a Cassandra database. */
- public static <T> Write<T> write() {
- return new AutoValue_CassandraIO_Write.Builder<T>()
+ /** Provide a {@link Mutate} {@link PTransform} to write data to a Cassandra database. */
+ public static <T> Mutate<T> write() {
+ return Mutate.<T>builder(MutationType.WRITE)
+ .setCassandraService(new CassandraServiceImpl<>())
+ .build();
+ }
+
+ /** Provide a {@link Mutate} {@link PTransform} to delete data to a Cassandra database. */
+ public static <T> Mutate<T> delete() {
+ return Mutate.<T>builder(MutationType.DELETE)
.setCassandraService(new CassandraServiceImpl<>())
.build();
}
@@ -313,12 +320,18 @@ public class CassandraIO {
}
}
+ /** Specify the mutation type: either write or delete. */
+ public enum MutationType {
+ WRITE,
+ DELETE
+ }
+
/**
- * A {@link PTransform} to write into Apache Cassandra. See {@link CassandraIO} for details on
+ * A {@link PTransform} to mutate into Apache Cassandra. See {@link CassandraIO} for details on
* usage and configuration.
*/
@AutoValue
- public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+ public abstract static class Mutate<T> extends PTransform<PCollection<T>, PDone> {
@Nullable
abstract List<String> hosts();
@@ -346,31 +359,48 @@ public class CassandraIO {
@Nullable
abstract CassandraService<T> cassandraService();
+ abstract MutationType mutationType();
+
abstract Builder<T> builder();
+ static <T> Builder<T> builder(MutationType mutationType) {
+ return new AutoValue_CassandraIO_Mutate.Builder<T>().setMutationType(mutationType);
+ }
+
/** Specify the Cassandra instance hosts where to write data. */
- public Write<T> withHosts(List<String> hosts) {
- checkArgument(hosts != null, "CassandraIO.write().withHosts(hosts) called with null hosts");
+ public Mutate<T> withHosts(List<String> hosts) {
+ checkArgument(
+ hosts != null,
+ "CassandraIO." + getMutationTypeName() + "().withHosts(hosts) called with null hosts");
checkArgument(
!hosts.isEmpty(),
- "CassandraIO.write().withHosts(hosts) called with empty " + "hosts list");
+ "CassandraIO."
+ + getMutationTypeName()
+ + "().withHosts(hosts) called with empty "
+ + "hosts list");
return builder().setHosts(hosts).build();
}
/** Specify the Cassandra instance port number where to write data. */
- public Write<T> withPort(int port) {
+ public Mutate<T> withPort(int port) {
checkArgument(
port > 0,
- "CassandraIO.write().withPort(port) called with invalid port " + "number (%s)",
+ "CassandraIO."
+ + getMutationTypeName()
+ + "().withPort(port) called with invalid port "
+ + "number (%s)",
port);
return builder().setPort(port).build();
}
/** Specify the Cassandra keyspace where to write data. */
- public Write<T> withKeyspace(String keyspace) {
+ public Mutate<T> withKeyspace(String keyspace) {
checkArgument(
keyspace != null,
- "CassandraIO.write().withKeyspace(keyspace) called with " + "null keyspace");
+ "CassandraIO."
+ + getMutationTypeName()
+ + "().withKeyspace(keyspace) called with "
+ + "null keyspace");
return builder().setKeyspace(keyspace).build();
}
@@ -378,40 +408,55 @@ public class CassandraIO {
* Specify the entity class in the input {@link PCollection}. The {@link CassandraIO} will map
* this entity to the Cassandra table thanks to the annotations.
*/
- public Write<T> withEntity(Class<T> entity) {
+ public Mutate<T> withEntity(Class<T> entity) {
checkArgument(
- entity != null, "CassandraIO.write().withEntity(entity) called with null " + "entity");
+ entity != null,
+ "CassandraIO."
+ + getMutationTypeName()
+ + "().withEntity(entity) called with null "
+ + "entity");
return builder().setEntity(entity).build();
}
/** Specify the username used for authentication. */
- public Write<T> withUsername(String username) {
+ public Mutate<T> withUsername(String username) {
checkArgument(
username != null,
- "CassandraIO.write().withUsername(username) called with " + "null username");
+ "CassandraIO."
+ + getMutationTypeName()
+ + "().withUsername(username) called with "
+ + "null username");
return builder().setUsername(username).build();
}
/** Specify the password used for authentication. */
- public Write<T> withPassword(String password) {
+ public Mutate<T> withPassword(String password) {
checkArgument(
password != null,
- "CassandraIO.write().withPassword(password) called with " + "null password");
+ "CassandraIO."
+ + getMutationTypeName()
+ + "().withPassword(password) called with "
+ + "null password");
return builder().setPassword(password).build();
}
/** Specify the local DC used by the load balancing policy. */
- public Write<T> withLocalDc(String localDc) {
+ public Mutate<T> withLocalDc(String localDc) {
checkArgument(
localDc != null,
- "CassandraIO.write().withLocalDc(localDc) called with null" + " localDc");
+ "CassandraIO."
+ + getMutationTypeName()
+ + "().withLocalDc(localDc) called with null"
+ + " localDc");
return builder().setLocalDc(localDc).build();
}
- public Write<T> withConsistencyLevel(String consistencyLevel) {
+ public Mutate<T> withConsistencyLevel(String consistencyLevel) {
checkArgument(
consistencyLevel != null,
- "CassandraIO.write().withConsistencyLevel"
+ "CassandraIO."
+ + getMutationTypeName()
+ + "().withConsistencyLevel"
+ "(consistencyLevel) called with null consistencyLevel");
return builder().setConsistencyLevel(consistencyLevel).build();
}
@@ -419,10 +464,13 @@ public class CassandraIO {
/**
* Specify the {@link CassandraService} used to connect and write into the Cassandra database.
*/
- public Write<T> withCassandraService(CassandraService<T> cassandraService) {
+ public Mutate<T> withCassandraService(CassandraService<T> cassandraService) {
checkArgument(
cassandraService != null,
- "CassandraIO.write().withCassandraService" + "(service) called with null service");
+ "CassandraIO."
+ + getMutationTypeName()
+ + "().withCassandraService"
+ + "(service) called with null service");
return builder().setCassandraService(cassandraService).build();
}
@@ -430,27 +478,47 @@ public class CassandraIO {
public void validate(PipelineOptions pipelineOptions) {
checkState(
hosts() != null || cassandraService() != null,
- "CassandraIO.write() requires a list of hosts to be set via withHosts(hosts) or a "
+ "CassandraIO."
+ + getMutationTypeName()
+ + "() requires a list of hosts to be set via withHosts(hosts) or a "
+ "Cassandra service to be set via withCassandraService(service)");
checkState(
port() != null || cassandraService() != null,
- "CassandraIO.write() requires a "
+ "CassandraIO."
+ + getMutationTypeName()
+ + "() requires a "
+ "valid port number to be set via withPort(port) or a Cassandra service to be set via "
+ "withCassandraService(service)");
checkState(
keyspace() != null,
- "CassandraIO.write() requires a keyspace to be set via " + "withKeyspace(keyspace)");
+ "CassandraIO."
+ + getMutationTypeName()
+ + "() requires a keyspace to be set via "
+ + "withKeyspace(keyspace)");
checkState(
entity() != null,
- "CassandraIO.write() requires an entity to be set via " + "withEntity(entity)");
+ "CassandraIO."
+ + getMutationTypeName()
+ + "() requires an entity to be set via "
+ + "withEntity(entity)");
}
@Override
public PDone expand(PCollection<T> input) {
- input.apply(ParDo.of(new WriteFn<>(this)));
+ if (mutationType() == MutationType.DELETE) {
+ input.apply(ParDo.of(new DeleteFn<T>(this)));
+ } else {
+ input.apply(ParDo.of(new WriteFn<T>(this)));
+ }
return PDone.in(input.getPipeline());
}
+ private String getMutationTypeName() {
+ return mutationType() == null
+ ? MutationType.WRITE.name().toLowerCase()
+ : mutationType().name().toLowerCase();
+ }
+
@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setHosts(List<String> hosts);
@@ -471,15 +539,17 @@ public class CassandraIO {
abstract Builder<T> setCassandraService(CassandraService<T> cassandraService);
- abstract Write<T> build();
+ abstract Builder<T> setMutationType(MutationType mutationType);
+
+ abstract Mutate<T> build();
}
}
private static class WriteFn<T> extends DoFn<T, Void> {
- private final Write<T> spec;
- private CassandraService.Writer writer;
+ private final Mutate<T> spec;
+ private CassandraService.Writer<T> writer;
- WriteFn(Write<T> spec) {
+ WriteFn(Mutate<T> spec) {
this.spec = spec;
}
@@ -499,4 +569,29 @@ public class CassandraIO {
writer = null;
}
}
+
+ private static class DeleteFn<T> extends DoFn<T, Void> {
+ private final Mutate<T> spec;
+ private CassandraService.Deleter<T> deleter;
+
+ DeleteFn(Mutate<T> spec) {
+ this.spec = spec;
+ }
+
+ @Setup
+ public void setup() {
+ deleter = spec.cassandraService().createDeleter(spec);
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws ExecutionException, InterruptedException {
+ deleter.delete(c.element());
+ }
+
+ @Teardown
+ public void teardown() throws Exception {
+ deleter.close();
+ deleter = null;
+ }
+ }
}
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
index be4a257..92bd261 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
@@ -38,7 +38,7 @@ public interface CassandraService<T> extends Serializable {
List<BoundedSource<T>> split(CassandraIO.Read<T> spec, long desiredBundleSizeBytes);
/** Create a {@link Writer} that writes entities into the Cassandra instance. */
- Writer createWriter(CassandraIO.Write<T> spec);
+ Writer createWriter(CassandraIO.Mutate<T> spec);
/** Writer for an entity. */
interface Writer<T> extends AutoCloseable {
@@ -48,4 +48,16 @@ public interface CassandraService<T> extends Serializable {
*/
void write(T entity) throws ExecutionException, InterruptedException;
}
+
+ /** Create a {@link Writer} that writes entities into the Cassandra instance. */
+ Deleter createDeleter(CassandraIO.Mutate<T> spec);
+
+ /** Deleter for an entity. */
+ interface Deleter<T> extends AutoCloseable {
+ /**
+ * This method should be synchronous. It means you have to be sure that the entity is fully
+ * stored (and committed) into the Cassandra instance when you exit from this method.
+ */
+ void delete(T entity) throws ExecutionException, InterruptedException;
+ }
}
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
index 62a9bce..93db829 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
@@ -42,6 +42,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.BoundedSource;
@@ -394,21 +395,39 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
}
/** Writer storing an entity into Apache Cassandra database. */
- protected class WriterImpl implements Writer<T> {
+ protected class WriterImpl extends MutatorImpl implements Writer<T> {
+
+ WriterImpl(CassandraIO.Mutate<T> spec) {
+ super(spec, Mapper::saveAsync, "writes");
+ }
+
+ @Override
+ public void write(T entity) throws ExecutionException, InterruptedException {
+ mutate(entity);
+ }
+ }
+
+ /** Mutator allowing to do side effects into Apache Cassandra database. */
+ protected abstract class MutatorImpl {
/**
* The threshold of 100 concurrent async queries is a heuristic commonly used by the Apache
* Cassandra community. There is no real gain to expect in tuning this value.
*/
private static final int CONCURRENT_ASYNC_QUERIES = 100;
- private final CassandraIO.Write<T> spec;
+ private final CassandraIO.Mutate<T> spec;
private final Cluster cluster;
private final Session session;
private final MappingManager mappingManager;
- private List<ListenableFuture<Void>> writeFutures;
-
- WriterImpl(CassandraIO.Write<T> spec) {
+ private List<ListenableFuture<Void>> mutateFutures;
+ private BiFunction<Mapper<T>, T, ListenableFuture<Void>> mutator;
+ private String operationName;
+
+ MutatorImpl(
+ CassandraIO.Mutate<T> spec,
+ BiFunction<Mapper<T>, T, ListenableFuture<Void>> mutator,
+ String operationName) {
this.spec = spec;
this.cluster =
getCluster(
@@ -420,34 +439,35 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
spec.consistencyLevel());
this.session = cluster.connect(spec.keyspace());
this.mappingManager = new MappingManager(session);
- this.writeFutures = new ArrayList<>();
+ this.mutateFutures = new ArrayList<>();
+ this.mutator = mutator;
+ this.operationName = operationName;
}
/**
- * Write the entity to the Cassandra instance, using {@link Mapper} obtained with the {@link
+ * Mutate the entity to the Cassandra instance, using {@link Mapper} obtained with the {@link
* MappingManager}. This method uses {@link Mapper#saveAsync(Object)} method, which is
* asynchronous. Beam will wait for all futures to complete, to guarantee all writes have
* succeeded.
*/
- @Override
- public void write(T entity) throws ExecutionException, InterruptedException {
+ public void mutate(T entity) throws ExecutionException, InterruptedException {
Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(entity.getClass());
- this.writeFutures.add(mapper.saveAsync(entity));
- if (this.writeFutures.size() == CONCURRENT_ASYNC_QUERIES) {
+ this.mutateFutures.add(mutator.apply(mapper, entity));
+ if (this.mutateFutures.size() == CONCURRENT_ASYNC_QUERIES) {
// We reached the max number of allowed in flight queries.
// Write methods are synchronous in Beam as stated by the CassandraService interface,
// so we wait for each async query to return before exiting.
LOG.debug(
- "Waiting for a batch of {} Cassandra writes to be executed...",
- CONCURRENT_ASYNC_QUERIES);
+ "Waiting for a batch of {} Cassandra {} to be executed...",
+ CONCURRENT_ASYNC_QUERIES,
+ operationName);
waitForFuturesToFinish();
- this.writeFutures = new ArrayList<>();
+ this.mutateFutures = new ArrayList<>();
}
}
- @Override
public void close() throws ExecutionException, InterruptedException {
- if (this.writeFutures.size() > 0) {
+ if (this.mutateFutures.size() > 0) {
// Waiting for the last in flight async queries to return before finishing the bundle.
waitForFuturesToFinish();
}
@@ -461,14 +481,32 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
}
private void waitForFuturesToFinish() throws ExecutionException, InterruptedException {
- for (ListenableFuture<Void> future : writeFutures) {
+ for (ListenableFuture<Void> future : mutateFutures) {
future.get();
}
}
}
@Override
- public Writer createWriter(CassandraIO.Write<T> spec) {
+ public Writer createWriter(CassandraIO.Mutate<T> spec) {
return new WriterImpl(spec);
}
+
+ /** Deleter storing an entity into Apache Cassandra database. */
+ protected class DeleterImpl extends MutatorImpl implements Deleter<T> {
+
+ DeleterImpl(CassandraIO.Mutate<T> spec) {
+ super(spec, (tMapper, t) -> tMapper.deleteAsync(t), "deletes");
+ }
+
+ @Override
+ public void delete(T entity) throws ExecutionException, InterruptedException {
+ mutate(entity);
+ }
+ }
+
+ @Override
+ public Deleter createDeleter(CassandraIO.Mutate<T> spec) {
+ return new DeleterImpl(spec);
+ }
}
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
index 5e98cef..4a048c0 100644
--- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
@@ -129,6 +129,32 @@ public class CassandraIOTest implements Serializable {
}
}
+ @Test
+ public void testDelete() {
+ FakeCassandraService service = new FakeCassandraService();
+ service.load();
+
+ assertEquals(10000, service.getTable().size());
+
+ pipeline
+ .apply(
+ CassandraIO.<Scientist>read()
+ .withCassandraService(service)
+ .withKeyspace("beam")
+ .withTable("scientist")
+ .withCoder(SerializableCoder.of(Scientist.class))
+ .withEntity(Scientist.class))
+ .apply(
+ CassandraIO.<Scientist>delete()
+ .withCassandraService(service)
+ .withKeyspace("beam")
+ .withEntity(Scientist.class));
+
+ pipeline.run();
+
+ assertEquals(0, service.getTable().size());
+ }
+
/** A {@link CassandraService} implementation that stores the entity in memory. */
private static class FakeCassandraService implements CassandraService<Scientist> {
private static final Map<Integer, Scientist> table = new ConcurrentHashMap<>();
@@ -241,9 +267,26 @@ public class CassandraIOTest implements Serializable {
}
@Override
- public FakeCassandraWriter createWriter(CassandraIO.Write<Scientist> spec) {
+ public FakeCassandraWriter createWriter(CassandraIO.Mutate<Scientist> spec) {
return new FakeCassandraWriter();
}
+
+ private static class FakeCassandraDeleter implements Deleter<Scientist> {
+ @Override
+ public void delete(Scientist scientist) {
+ table.remove(scientist.id);
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+ }
+
+ @Override
+ public FakeCassandraDeleter createDeleter(CassandraIO.Mutate<Scientist> spec) {
+ return new FakeCassandraDeleter();
+ }
}
/** Simple Cassandra entity used in test. */