You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2018/12/10 16:31:06 UTC

[beam] 02/03: [BEAM-6079] Fix access level and clean up generics issues

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 07d931195fcc571122eb756b0e3ff524b981d3a5
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Mon Dec 10 17:24:21 2018 +0100

    [BEAM-6079] Fix access level and clean up generics issues
---
 .../apache/beam/sdk/io/cassandra/CassandraIO.java   |  4 ++--
 .../beam/sdk/io/cassandra/CassandraService.java     |  6 +++---
 .../beam/sdk/io/cassandra/CassandraServiceImpl.java | 18 +++++++++---------
 .../beam/sdk/io/cassandra/CassandraIOTest.java      | 21 ++++++++++-----------
 4 files changed, 24 insertions(+), 25 deletions(-)

diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 838dd82..34bc9ba 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -506,9 +506,9 @@ public class CassandraIO {
     @Override
     public PDone expand(PCollection<T> input) {
       if (mutationType() == MutationType.DELETE) {
-        input.apply(ParDo.of(new DeleteFn<T>(this)));
+        input.apply(ParDo.of(new DeleteFn<>(this)));
       } else {
-        input.apply(ParDo.of(new WriteFn<T>(this)));
+        input.apply(ParDo.of(new WriteFn<>(this)));
       }
       return PDone.in(input.getPipeline());
     }
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
index 92bd261..9f8d840 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
@@ -23,7 +23,7 @@ import java.util.concurrent.ExecutionException;
 import org.apache.beam.sdk.io.BoundedSource;
 
 /** An interface for real or fake implementations of Cassandra. */
-public interface CassandraService<T> extends Serializable {
+interface CassandraService<T> extends Serializable {
   /**
    * Returns a {@link org.apache.beam.sdk.io.BoundedSource.BoundedReader} that will read from
    * Cassandra using the spec from {@link
@@ -38,7 +38,7 @@ public interface CassandraService<T> extends Serializable {
   List<BoundedSource<T>> split(CassandraIO.Read<T> spec, long desiredBundleSizeBytes);
 
   /** Create a {@link Writer} that writes entities into the Cassandra instance. */
-  Writer createWriter(CassandraIO.Mutate<T> spec);
+  Writer<T> createWriter(CassandraIO.Mutate<T> spec);
 
   /** Writer for an entity. */
   interface Writer<T> extends AutoCloseable {
@@ -50,7 +50,7 @@ public interface CassandraService<T> extends Serializable {
   }
 
   /** Create a {@link Writer} that writes entities into the Cassandra instance. */
-  Deleter createDeleter(CassandraIO.Mutate<T> spec);
+  Deleter<T> createDeleter(CassandraIO.Mutate<T> spec);
 
   /** Deleter for an entity. */
   interface Deleter<T> extends AutoCloseable {
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
index 93db829..c912220 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
@@ -263,7 +263,7 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
           queries.add(query);
         }
       }
-      sources.add(new CassandraIO.CassandraSource(spec, queries));
+      sources.add(new CassandraIO.CassandraSource<>(spec, queries));
     }
     return sources;
   }
@@ -395,7 +395,7 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
   }
 
   /** Writer storing an entity into Apache Cassandra database. */
-  protected class WriterImpl extends MutatorImpl implements Writer<T> {
+  class WriterImpl extends MutatorImpl implements Writer<T> {
 
     WriterImpl(CassandraIO.Mutate<T> spec) {
       super(spec, Mapper::saveAsync, "writes");
@@ -408,7 +408,7 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
   }
 
   /** Mutator allowing to do side effects into Apache Cassandra database. */
-  protected abstract class MutatorImpl {
+  abstract class MutatorImpl {
     /**
      * The threshold of 100 concurrent async queries is a heuristic commonly used by the Apache
      * Cassandra community. There is no real gain to expect in tuning this value.
@@ -421,8 +421,8 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
     private final Session session;
     private final MappingManager mappingManager;
     private List<ListenableFuture<Void>> mutateFutures;
-    private BiFunction<Mapper<T>, T, ListenableFuture<Void>> mutator;
-    private String operationName;
+    private final BiFunction<Mapper<T>, T, ListenableFuture<Void>> mutator;
+    private final String operationName;
 
     MutatorImpl(
         CassandraIO.Mutate<T> spec,
@@ -450,7 +450,7 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
      * asynchronous. Beam will wait for all futures to complete, to guarantee all writes have
      * succeeded.
      */
-    public void mutate(T entity) throws ExecutionException, InterruptedException {
+    void mutate(T entity) throws ExecutionException, InterruptedException {
       Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(entity.getClass());
       this.mutateFutures.add(mutator.apply(mapper, entity));
       if (this.mutateFutures.size() == CONCURRENT_ASYNC_QUERIES) {
@@ -488,7 +488,7 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
   }
 
   @Override
-  public Writer createWriter(CassandraIO.Mutate<T> spec) {
+  public Writer<T> createWriter(CassandraIO.Mutate<T> spec) {
     return new WriterImpl(spec);
   }
 
@@ -496,7 +496,7 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
   protected class DeleterImpl extends MutatorImpl implements Deleter<T> {
 
     DeleterImpl(CassandraIO.Mutate<T> spec) {
-      super(spec, (tMapper, t) -> tMapper.deleteAsync(t), "deletes");
+      super(spec, Mapper::deleteAsync, "deletes");
     }
 
     @Override
@@ -506,7 +506,7 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
   }
 
   @Override
-  public Deleter createDeleter(CassandraIO.Mutate<T> spec) {
+  public Deleter<T> createDeleter(CassandraIO.Mutate<T> spec) {
     return new DeleterImpl(spec);
   }
 }
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
index 4a048c0..5940d41 100644
--- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
@@ -26,6 +26,7 @@ import com.google.common.base.Objects;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -58,8 +59,8 @@ public class CassandraIOTest implements Serializable {
     service.load();
 
     PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
-    CassandraIO.Read spec = CassandraIO.<Scientist>read().withCassandraService(service);
-    CassandraIO.CassandraSource source = new CassandraIO.CassandraSource(spec, null);
+    CassandraIO.Read<Scientist> spec = CassandraIO.<Scientist>read().withCassandraService(service);
+    CassandraIO.CassandraSource<Scientist> source = new CassandraIO.CassandraSource<>(spec, null);
     long estimatedSizeBytes = source.getEstimatedSizeBytes(pipelineOptions);
     // the size is the sum of the bytes size of the String representation of a scientist in the map
     assertEquals(113890, estimatedSizeBytes);
@@ -187,17 +188,17 @@ public class CassandraIOTest implements Serializable {
     }
 
     @Override
-    public BoundedReader<Scientist> createReader(CassandraIO.CassandraSource source) {
+    public BoundedReader<Scientist> createReader(CassandraIO.CassandraSource<Scientist> source) {
       return new FakeCassandraReader(source);
     }
 
-    private static class FakeCassandraReader extends BoundedSource.BoundedReader {
-      private final CassandraIO.CassandraSource source;
+    private static class FakeCassandraReader extends BoundedSource.BoundedReader<Scientist> {
+      private final CassandraIO.CassandraSource<Scientist> source;
 
       private Iterator<Scientist> iterator;
       private Scientist current;
 
-      FakeCassandraReader(CassandraIO.CassandraSource source) {
+      FakeCassandraReader(CassandraIO.CassandraSource<Scientist> source) {
         this.source = source;
       }
 
@@ -232,7 +233,7 @@ public class CassandraIOTest implements Serializable {
       }
 
       @Override
-      public CassandraIO.CassandraSource getCurrentSource() {
+      public CassandraIO.CassandraSource<Scientist> getCurrentSource() {
         return this.source;
       }
     }
@@ -248,10 +249,8 @@ public class CassandraIOTest implements Serializable {
 
     @Override
     public List<BoundedSource<Scientist>> split(
-        CassandraIO.Read spec, long desiredBundleSizeBytes) {
-      List<BoundedSource<Scientist>> sources = new ArrayList<>();
-      sources.add(new CassandraIO.CassandraSource<Scientist>(spec, null));
-      return sources;
+        CassandraIO.Read<Scientist> spec, long desiredBundleSizeBytes) {
+      return Collections.singletonList(new CassandraIO.CassandraSource<>(spec, null));
     }
 
     private static class FakeCassandraWriter implements Writer<Scientist> {