You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2018/12/10 16:31:05 UTC

[beam] 01/03: [BEAM-6079] Add ability for CassandraIO to delete data

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8100f3204b76d4a891f75262123c23d1e22d3ca5
Author: Fabien Rousseau <fa...@happn.com>
AuthorDate: Fri Nov 16 14:28:26 2018 +0100

    [BEAM-6079] Add ability for CassandraIO to delete data
---
 .../apache/beam/sdk/io/cassandra/CassandraIO.java  | 161 ++++++++++++++++-----
 .../beam/sdk/io/cassandra/CassandraService.java    |  14 +-
 .../sdk/io/cassandra/CassandraServiceImpl.java     |  74 +++++++---
 .../beam/sdk/io/cassandra/CassandraIOTest.java     |  45 +++++-
 4 files changed, 241 insertions(+), 53 deletions(-)

diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index db073d7..838dd82 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -88,9 +88,16 @@ public class CassandraIO {
         .build();
   }
 
-  /** Provide a {@link Write} {@link PTransform} to write data to a Cassandra database. */
-  public static <T> Write<T> write() {
-    return new AutoValue_CassandraIO_Write.Builder<T>()
+  /** Provide a {@link Mutate} {@link PTransform} to write data to a Cassandra database. */
+  public static <T> Mutate<T> write() {
+    return Mutate.<T>builder(MutationType.WRITE)
+        .setCassandraService(new CassandraServiceImpl<>())
+        .build();
+  }
+
+  /** Provide a {@link Mutate} {@link PTransform} to delete data to a Cassandra database. */
+  public static <T> Mutate<T> delete() {
+    return Mutate.<T>builder(MutationType.DELETE)
         .setCassandraService(new CassandraServiceImpl<>())
         .build();
   }
@@ -313,12 +320,18 @@ public class CassandraIO {
     }
   }
 
+  /** Specify the mutation type: either write or delete. */
+  public enum MutationType {
+    WRITE,
+    DELETE
+  }
+
   /**
-   * A {@link PTransform} to write into Apache Cassandra. See {@link CassandraIO} for details on
+   * A {@link PTransform} to mutate into Apache Cassandra. See {@link CassandraIO} for details on
    * usage and configuration.
    */
   @AutoValue
-  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+  public abstract static class Mutate<T> extends PTransform<PCollection<T>, PDone> {
     @Nullable
     abstract List<String> hosts();
 
@@ -346,31 +359,48 @@ public class CassandraIO {
     @Nullable
     abstract CassandraService<T> cassandraService();
 
+    abstract MutationType mutationType();
+
     abstract Builder<T> builder();
 
+    static <T> Builder<T> builder(MutationType mutationType) {
+      return new AutoValue_CassandraIO_Mutate.Builder<T>().setMutationType(mutationType);
+    }
+
     /** Specify the Cassandra instance hosts where to write data. */
-    public Write<T> withHosts(List<String> hosts) {
-      checkArgument(hosts != null, "CassandraIO.write().withHosts(hosts) called with null hosts");
+    public Mutate<T> withHosts(List<String> hosts) {
+      checkArgument(
+          hosts != null,
+          "CassandraIO." + getMutationTypeName() + "().withHosts(hosts) called with null hosts");
       checkArgument(
           !hosts.isEmpty(),
-          "CassandraIO.write().withHosts(hosts) called with empty " + "hosts list");
+          "CassandraIO."
+              + getMutationTypeName()
+              + "().withHosts(hosts) called with empty "
+              + "hosts list");
       return builder().setHosts(hosts).build();
     }
 
     /** Specify the Cassandra instance port number where to write data. */
-    public Write<T> withPort(int port) {
+    public Mutate<T> withPort(int port) {
       checkArgument(
           port > 0,
-          "CassandraIO.write().withPort(port) called with invalid port " + "number (%s)",
+          "CassandraIO."
+              + getMutationTypeName()
+              + "().withPort(port) called with invalid port "
+              + "number (%s)",
           port);
       return builder().setPort(port).build();
     }
 
     /** Specify the Cassandra keyspace where to write data. */
-    public Write<T> withKeyspace(String keyspace) {
+    public Mutate<T> withKeyspace(String keyspace) {
       checkArgument(
           keyspace != null,
-          "CassandraIO.write().withKeyspace(keyspace) called with " + "null keyspace");
+          "CassandraIO."
+              + getMutationTypeName()
+              + "().withKeyspace(keyspace) called with "
+              + "null keyspace");
       return builder().setKeyspace(keyspace).build();
     }
 
@@ -378,40 +408,55 @@ public class CassandraIO {
      * Specify the entity class in the input {@link PCollection}. The {@link CassandraIO} will map
      * this entity to the Cassandra table thanks to the annotations.
      */
-    public Write<T> withEntity(Class<T> entity) {
+    public Mutate<T> withEntity(Class<T> entity) {
       checkArgument(
-          entity != null, "CassandraIO.write().withEntity(entity) called with null " + "entity");
+          entity != null,
+          "CassandraIO."
+              + getMutationTypeName()
+              + "().withEntity(entity) called with null "
+              + "entity");
       return builder().setEntity(entity).build();
     }
 
     /** Specify the username used for authentication. */
-    public Write<T> withUsername(String username) {
+    public Mutate<T> withUsername(String username) {
       checkArgument(
           username != null,
-          "CassandraIO.write().withUsername(username) called with " + "null username");
+          "CassandraIO."
+              + getMutationTypeName()
+              + "().withUsername(username) called with "
+              + "null username");
       return builder().setUsername(username).build();
     }
 
     /** Specify the password used for authentication. */
-    public Write<T> withPassword(String password) {
+    public Mutate<T> withPassword(String password) {
       checkArgument(
           password != null,
-          "CassandraIO.write().withPassword(password) called with " + "null password");
+          "CassandraIO."
+              + getMutationTypeName()
+              + "().withPassword(password) called with "
+              + "null password");
       return builder().setPassword(password).build();
     }
 
     /** Specify the local DC used by the load balancing policy. */
-    public Write<T> withLocalDc(String localDc) {
+    public Mutate<T> withLocalDc(String localDc) {
       checkArgument(
           localDc != null,
-          "CassandraIO.write().withLocalDc(localDc) called with null" + " localDc");
+          "CassandraIO."
+              + getMutationTypeName()
+              + "().withLocalDc(localDc) called with null"
+              + " localDc");
       return builder().setLocalDc(localDc).build();
     }
 
-    public Write<T> withConsistencyLevel(String consistencyLevel) {
+    public Mutate<T> withConsistencyLevel(String consistencyLevel) {
       checkArgument(
           consistencyLevel != null,
-          "CassandraIO.write().withConsistencyLevel"
+          "CassandraIO."
+              + getMutationTypeName()
+              + "().withConsistencyLevel"
               + "(consistencyLevel) called with null consistencyLevel");
       return builder().setConsistencyLevel(consistencyLevel).build();
     }
@@ -419,10 +464,13 @@ public class CassandraIO {
     /**
      * Specify the {@link CassandraService} used to connect and write into the Cassandra database.
      */
-    public Write<T> withCassandraService(CassandraService<T> cassandraService) {
+    public Mutate<T> withCassandraService(CassandraService<T> cassandraService) {
       checkArgument(
           cassandraService != null,
-          "CassandraIO.write().withCassandraService" + "(service) called with null service");
+          "CassandraIO."
+              + getMutationTypeName()
+              + "().withCassandraService"
+              + "(service) called with null service");
       return builder().setCassandraService(cassandraService).build();
     }
 
@@ -430,27 +478,47 @@ public class CassandraIO {
     public void validate(PipelineOptions pipelineOptions) {
       checkState(
           hosts() != null || cassandraService() != null,
-          "CassandraIO.write() requires a list of hosts to be set via withHosts(hosts) or a "
+          "CassandraIO."
+              + getMutationTypeName()
+              + "() requires a list of hosts to be set via withHosts(hosts) or a "
               + "Cassandra service to be set via withCassandraService(service)");
       checkState(
           port() != null || cassandraService() != null,
-          "CassandraIO.write() requires a "
+          "CassandraIO."
+              + getMutationTypeName()
+              + "() requires a "
               + "valid port number to be set via withPort(port) or a Cassandra service to be set via "
               + "withCassandraService(service)");
       checkState(
           keyspace() != null,
-          "CassandraIO.write() requires a keyspace to be set via " + "withKeyspace(keyspace)");
+          "CassandraIO."
+              + getMutationTypeName()
+              + "() requires a keyspace to be set via "
+              + "withKeyspace(keyspace)");
       checkState(
           entity() != null,
-          "CassandraIO.write() requires an entity to be set via " + "withEntity(entity)");
+          "CassandraIO."
+              + getMutationTypeName()
+              + "() requires an entity to be set via "
+              + "withEntity(entity)");
     }
 
     @Override
     public PDone expand(PCollection<T> input) {
-      input.apply(ParDo.of(new WriteFn<>(this)));
+      if (mutationType() == MutationType.DELETE) {
+        input.apply(ParDo.of(new DeleteFn<T>(this)));
+      } else {
+        input.apply(ParDo.of(new WriteFn<T>(this)));
+      }
       return PDone.in(input.getPipeline());
     }
 
+    private String getMutationTypeName() {
+      return mutationType() == null
+          ? MutationType.WRITE.name().toLowerCase()
+          : mutationType().name().toLowerCase();
+    }
+
     @AutoValue.Builder
     abstract static class Builder<T> {
       abstract Builder<T> setHosts(List<String> hosts);
@@ -471,15 +539,17 @@ public class CassandraIO {
 
       abstract Builder<T> setCassandraService(CassandraService<T> cassandraService);
 
-      abstract Write<T> build();
+      abstract Builder<T> setMutationType(MutationType mutationType);
+
+      abstract Mutate<T> build();
     }
   }
 
   private static class WriteFn<T> extends DoFn<T, Void> {
-    private final Write<T> spec;
-    private CassandraService.Writer writer;
+    private final Mutate<T> spec;
+    private CassandraService.Writer<T> writer;
 
-    WriteFn(Write<T> spec) {
+    WriteFn(Mutate<T> spec) {
       this.spec = spec;
     }
 
@@ -499,4 +569,29 @@ public class CassandraIO {
       writer = null;
     }
   }
+
+  private static class DeleteFn<T> extends DoFn<T, Void> {
+    private final Mutate<T> spec;
+    private CassandraService.Deleter<T> deleter;
+
+    DeleteFn(Mutate<T> spec) {
+      this.spec = spec;
+    }
+
+    @Setup
+    public void setup() {
+      deleter = spec.cassandraService().createDeleter(spec);
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws ExecutionException, InterruptedException {
+      deleter.delete(c.element());
+    }
+
+    @Teardown
+    public void teardown() throws Exception {
+      deleter.close();
+      deleter = null;
+    }
+  }
 }
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
index be4a257..92bd261 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
@@ -38,7 +38,7 @@ public interface CassandraService<T> extends Serializable {
   List<BoundedSource<T>> split(CassandraIO.Read<T> spec, long desiredBundleSizeBytes);
 
   /** Create a {@link Writer} that writes entities into the Cassandra instance. */
-  Writer createWriter(CassandraIO.Write<T> spec);
+  Writer createWriter(CassandraIO.Mutate<T> spec);
 
   /** Writer for an entity. */
   interface Writer<T> extends AutoCloseable {
@@ -48,4 +48,16 @@ public interface CassandraService<T> extends Serializable {
      */
     void write(T entity) throws ExecutionException, InterruptedException;
   }
+
+  /** Create a {@link Writer} that writes entities into the Cassandra instance. */
+  Deleter createDeleter(CassandraIO.Mutate<T> spec);
+
+  /** Deleter for an entity. */
+  interface Deleter<T> extends AutoCloseable {
+    /**
+     * This method should be synchronous. It means you have to be sure that the entity is fully
+     * stored (and committed) into the Cassandra instance when you exit from this method.
+     */
+    void delete(T entity) throws ExecutionException, InterruptedException;
+  }
 }
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
index 62a9bce..93db829 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
@@ -42,6 +42,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -394,21 +395,39 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
   }
 
   /** Writer storing an entity into Apache Cassandra database. */
-  protected class WriterImpl implements Writer<T> {
+  protected class WriterImpl extends MutatorImpl implements Writer<T> {
+
+    WriterImpl(CassandraIO.Mutate<T> spec) {
+      super(spec, Mapper::saveAsync, "writes");
+    }
+
+    @Override
+    public void write(T entity) throws ExecutionException, InterruptedException {
+      mutate(entity);
+    }
+  }
+
+  /** Mutator allowing to do side effects into Apache Cassandra database. */
+  protected abstract class MutatorImpl {
     /**
      * The threshold of 100 concurrent async queries is a heuristic commonly used by the Apache
      * Cassandra community. There is no real gain to expect in tuning this value.
      */
     private static final int CONCURRENT_ASYNC_QUERIES = 100;
 
-    private final CassandraIO.Write<T> spec;
+    private final CassandraIO.Mutate<T> spec;
 
     private final Cluster cluster;
     private final Session session;
     private final MappingManager mappingManager;
-    private List<ListenableFuture<Void>> writeFutures;
-
-    WriterImpl(CassandraIO.Write<T> spec) {
+    private List<ListenableFuture<Void>> mutateFutures;
+    private BiFunction<Mapper<T>, T, ListenableFuture<Void>> mutator;
+    private String operationName;
+
+    MutatorImpl(
+        CassandraIO.Mutate<T> spec,
+        BiFunction<Mapper<T>, T, ListenableFuture<Void>> mutator,
+        String operationName) {
       this.spec = spec;
       this.cluster =
           getCluster(
@@ -420,34 +439,35 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
               spec.consistencyLevel());
       this.session = cluster.connect(spec.keyspace());
       this.mappingManager = new MappingManager(session);
-      this.writeFutures = new ArrayList<>();
+      this.mutateFutures = new ArrayList<>();
+      this.mutator = mutator;
+      this.operationName = operationName;
     }
 
     /**
-     * Write the entity to the Cassandra instance, using {@link Mapper} obtained with the {@link
+     * Mutate the entity to the Cassandra instance, using {@link Mapper} obtained with the {@link
      * MappingManager}. This method uses {@link Mapper#saveAsync(Object)} method, which is
      * asynchronous. Beam will wait for all futures to complete, to guarantee all writes have
      * succeeded.
      */
-    @Override
-    public void write(T entity) throws ExecutionException, InterruptedException {
+    public void mutate(T entity) throws ExecutionException, InterruptedException {
       Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(entity.getClass());
-      this.writeFutures.add(mapper.saveAsync(entity));
-      if (this.writeFutures.size() == CONCURRENT_ASYNC_QUERIES) {
+      this.mutateFutures.add(mutator.apply(mapper, entity));
+      if (this.mutateFutures.size() == CONCURRENT_ASYNC_QUERIES) {
         // We reached the max number of allowed in flight queries.
         // Write methods are synchronous in Beam as stated by the CassandraService interface,
         // so we wait for each async query to return before exiting.
         LOG.debug(
-            "Waiting for a batch of {} Cassandra writes to be executed...",
-            CONCURRENT_ASYNC_QUERIES);
+            "Waiting for a batch of {} Cassandra {} to be executed...",
+            CONCURRENT_ASYNC_QUERIES,
+            operationName);
         waitForFuturesToFinish();
-        this.writeFutures = new ArrayList<>();
+        this.mutateFutures = new ArrayList<>();
       }
     }
 
-    @Override
     public void close() throws ExecutionException, InterruptedException {
-      if (this.writeFutures.size() > 0) {
+      if (this.mutateFutures.size() > 0) {
         // Waiting for the last in flight async queries to return before finishing the bundle.
         waitForFuturesToFinish();
       }
@@ -461,14 +481,32 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
     }
 
     private void waitForFuturesToFinish() throws ExecutionException, InterruptedException {
-      for (ListenableFuture<Void> future : writeFutures) {
+      for (ListenableFuture<Void> future : mutateFutures) {
         future.get();
       }
     }
   }
 
   @Override
-  public Writer createWriter(CassandraIO.Write<T> spec) {
+  public Writer createWriter(CassandraIO.Mutate<T> spec) {
     return new WriterImpl(spec);
   }
+
+  /** Deleter storing an entity into Apache Cassandra database. */
+  protected class DeleterImpl extends MutatorImpl implements Deleter<T> {
+
+    DeleterImpl(CassandraIO.Mutate<T> spec) {
+      super(spec, (tMapper, t) -> tMapper.deleteAsync(t), "deletes");
+    }
+
+    @Override
+    public void delete(T entity) throws ExecutionException, InterruptedException {
+      mutate(entity);
+    }
+  }
+
+  @Override
+  public Deleter createDeleter(CassandraIO.Mutate<T> spec) {
+    return new DeleterImpl(spec);
+  }
 }
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
index 5e98cef..4a048c0 100644
--- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
@@ -129,6 +129,32 @@ public class CassandraIOTest implements Serializable {
     }
   }
 
+  @Test
+  public void testDelete() {
+    FakeCassandraService service = new FakeCassandraService();
+    service.load();
+
+    assertEquals(10000, service.getTable().size());
+
+    pipeline
+        .apply(
+            CassandraIO.<Scientist>read()
+                .withCassandraService(service)
+                .withKeyspace("beam")
+                .withTable("scientist")
+                .withCoder(SerializableCoder.of(Scientist.class))
+                .withEntity(Scientist.class))
+        .apply(
+            CassandraIO.<Scientist>delete()
+                .withCassandraService(service)
+                .withKeyspace("beam")
+                .withEntity(Scientist.class));
+
+    pipeline.run();
+
+    assertEquals(0, service.getTable().size());
+  }
+
   /** A {@link CassandraService} implementation that stores the entity in memory. */
   private static class FakeCassandraService implements CassandraService<Scientist> {
     private static final Map<Integer, Scientist> table = new ConcurrentHashMap<>();
@@ -241,9 +267,26 @@ public class CassandraIOTest implements Serializable {
     }
 
     @Override
-    public FakeCassandraWriter createWriter(CassandraIO.Write<Scientist> spec) {
+    public FakeCassandraWriter createWriter(CassandraIO.Mutate<Scientist> spec) {
       return new FakeCassandraWriter();
     }
+
+    private static class FakeCassandraDeleter implements Deleter<Scientist> {
+      @Override
+      public void delete(Scientist scientist) {
+        table.remove(scientist.id);
+      }
+
+      @Override
+      public void close() {
+        // nothing to do
+      }
+    }
+
+    @Override
+    public FakeCassandraDeleter createDeleter(CassandraIO.Mutate<Scientist> spec) {
+      return new FakeCassandraDeleter();
+    }
   }
 
   /** Simple Cassandra entity used in test. */