You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2018/12/10 16:31:06 UTC
[beam] 02/03: [BEAM-6079] Fix access level and clean up generics
issues
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 07d931195fcc571122eb756b0e3ff524b981d3a5
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Mon Dec 10 17:24:21 2018 +0100
[BEAM-6079] Fix access level and clean up generics issues
---
.../apache/beam/sdk/io/cassandra/CassandraIO.java | 4 ++--
.../beam/sdk/io/cassandra/CassandraService.java | 6 +++---
.../beam/sdk/io/cassandra/CassandraServiceImpl.java | 18 +++++++++---------
.../beam/sdk/io/cassandra/CassandraIOTest.java | 21 ++++++++++-----------
4 files changed, 24 insertions(+), 25 deletions(-)
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 838dd82..34bc9ba 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -506,9 +506,9 @@ public class CassandraIO {
@Override
public PDone expand(PCollection<T> input) {
if (mutationType() == MutationType.DELETE) {
- input.apply(ParDo.of(new DeleteFn<T>(this)));
+ input.apply(ParDo.of(new DeleteFn<>(this)));
} else {
- input.apply(ParDo.of(new WriteFn<T>(this)));
+ input.apply(ParDo.of(new WriteFn<>(this)));
}
return PDone.in(input.getPipeline());
}
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
index 92bd261..9f8d840 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
@@ -23,7 +23,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.io.BoundedSource;
/** An interface for real or fake implementations of Cassandra. */
-public interface CassandraService<T> extends Serializable {
+interface CassandraService<T> extends Serializable {
/**
* Returns a {@link org.apache.beam.sdk.io.BoundedSource.BoundedReader} that will read from
* Cassandra using the spec from {@link
@@ -38,7 +38,7 @@ public interface CassandraService<T> extends Serializable {
List<BoundedSource<T>> split(CassandraIO.Read<T> spec, long desiredBundleSizeBytes);
/** Create a {@link Writer} that writes entities into the Cassandra instance. */
- Writer createWriter(CassandraIO.Mutate<T> spec);
+ Writer<T> createWriter(CassandraIO.Mutate<T> spec);
/** Writer for an entity. */
interface Writer<T> extends AutoCloseable {
@@ -50,7 +50,7 @@ public interface CassandraService<T> extends Serializable {
}
/** Create a {@link Writer} that writes entities into the Cassandra instance. */
- Deleter createDeleter(CassandraIO.Mutate<T> spec);
+ Deleter<T> createDeleter(CassandraIO.Mutate<T> spec);
/** Deleter for an entity. */
interface Deleter<T> extends AutoCloseable {
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
index 93db829..c912220 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
@@ -263,7 +263,7 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
queries.add(query);
}
}
- sources.add(new CassandraIO.CassandraSource(spec, queries));
+ sources.add(new CassandraIO.CassandraSource<>(spec, queries));
}
return sources;
}
@@ -395,7 +395,7 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
}
/** Writer storing an entity into Apache Cassandra database. */
- protected class WriterImpl extends MutatorImpl implements Writer<T> {
+ class WriterImpl extends MutatorImpl implements Writer<T> {
WriterImpl(CassandraIO.Mutate<T> spec) {
super(spec, Mapper::saveAsync, "writes");
@@ -408,7 +408,7 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
}
/** Mutator allowing to do side effects into Apache Cassandra database. */
- protected abstract class MutatorImpl {
+ abstract class MutatorImpl {
/**
* The threshold of 100 concurrent async queries is a heuristic commonly used by the Apache
* Cassandra community. There is no real gain to expect in tuning this value.
@@ -421,8 +421,8 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
private final Session session;
private final MappingManager mappingManager;
private List<ListenableFuture<Void>> mutateFutures;
- private BiFunction<Mapper<T>, T, ListenableFuture<Void>> mutator;
- private String operationName;
+ private final BiFunction<Mapper<T>, T, ListenableFuture<Void>> mutator;
+ private final String operationName;
MutatorImpl(
CassandraIO.Mutate<T> spec,
@@ -450,7 +450,7 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
* asynchronous. Beam will wait for all futures to complete, to guarantee all writes have
* succeeded.
*/
- public void mutate(T entity) throws ExecutionException, InterruptedException {
+ void mutate(T entity) throws ExecutionException, InterruptedException {
Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(entity.getClass());
this.mutateFutures.add(mutator.apply(mapper, entity));
if (this.mutateFutures.size() == CONCURRENT_ASYNC_QUERIES) {
@@ -488,7 +488,7 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
}
@Override
- public Writer createWriter(CassandraIO.Mutate<T> spec) {
+ public Writer<T> createWriter(CassandraIO.Mutate<T> spec) {
return new WriterImpl(spec);
}
@@ -496,7 +496,7 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
protected class DeleterImpl extends MutatorImpl implements Deleter<T> {
DeleterImpl(CassandraIO.Mutate<T> spec) {
- super(spec, (tMapper, t) -> tMapper.deleteAsync(t), "deletes");
+ super(spec, Mapper::deleteAsync, "deletes");
}
@Override
@@ -506,7 +506,7 @@ public class CassandraServiceImpl<T> implements CassandraService<T> {
}
@Override
- public Deleter createDeleter(CassandraIO.Mutate<T> spec) {
+ public Deleter<T> createDeleter(CassandraIO.Mutate<T> spec) {
return new DeleterImpl(spec);
}
}
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
index 4a048c0..5940d41 100644
--- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
@@ -26,6 +26,7 @@ import com.google.common.base.Objects;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -58,8 +59,8 @@ public class CassandraIOTest implements Serializable {
service.load();
PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
- CassandraIO.Read spec = CassandraIO.<Scientist>read().withCassandraService(service);
- CassandraIO.CassandraSource source = new CassandraIO.CassandraSource(spec, null);
+ CassandraIO.Read<Scientist> spec = CassandraIO.<Scientist>read().withCassandraService(service);
+ CassandraIO.CassandraSource<Scientist> source = new CassandraIO.CassandraSource<>(spec, null);
long estimatedSizeBytes = source.getEstimatedSizeBytes(pipelineOptions);
// the size is the sum of the bytes size of the String representation of a scientist in the map
assertEquals(113890, estimatedSizeBytes);
@@ -187,17 +188,17 @@ public class CassandraIOTest implements Serializable {
}
@Override
- public BoundedReader<Scientist> createReader(CassandraIO.CassandraSource source) {
+ public BoundedReader<Scientist> createReader(CassandraIO.CassandraSource<Scientist> source) {
return new FakeCassandraReader(source);
}
- private static class FakeCassandraReader extends BoundedSource.BoundedReader {
- private final CassandraIO.CassandraSource source;
+ private static class FakeCassandraReader extends BoundedSource.BoundedReader<Scientist> {
+ private final CassandraIO.CassandraSource<Scientist> source;
private Iterator<Scientist> iterator;
private Scientist current;
- FakeCassandraReader(CassandraIO.CassandraSource source) {
+ FakeCassandraReader(CassandraIO.CassandraSource<Scientist> source) {
this.source = source;
}
@@ -232,7 +233,7 @@ public class CassandraIOTest implements Serializable {
}
@Override
- public CassandraIO.CassandraSource getCurrentSource() {
+ public CassandraIO.CassandraSource<Scientist> getCurrentSource() {
return this.source;
}
}
@@ -248,10 +249,8 @@ public class CassandraIOTest implements Serializable {
@Override
public List<BoundedSource<Scientist>> split(
- CassandraIO.Read spec, long desiredBundleSizeBytes) {
- List<BoundedSource<Scientist>> sources = new ArrayList<>();
- sources.add(new CassandraIO.CassandraSource<Scientist>(spec, null));
- return sources;
+ CassandraIO.Read<Scientist> spec, long desiredBundleSizeBytes) {
+ return Collections.singletonList(new CassandraIO.CassandraSource<>(spec, null));
}
private static class FakeCassandraWriter implements Writer<Scientist> {