You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/02 21:28:35 UTC

[GitHub] [beam] iemejia commented on a change in pull request #10546: [BEAM-9008] Add CassandraIO readAll method

iemejia commented on a change in pull request #10546:
URL: https://github.com/apache/beam/pull/10546#discussion_r433896372



##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/RingRange.java
##########
@@ -55,4 +58,9 @@ public boolean isWrapping() {
   public String toString() {
     return String.format("(%s,%s]", start.toString(), end.toString());
   }
+
+  public static RingRange fromEncodedKey(Metadata metadata, ByteBuffer... bb) {

Review comment:
       Can we move this method to the test class where it is used. I don't want to add Cassandra specific `Metadata` to the public API of RingRange with the hope this will help us evolve RingRange into a proper `Restriction` (future work out of the scope of this PR)
   
   Can you also please add a `public static RingRange of(BigInteger start, BigInteger send)` method and make the normal constructor private and refactor in every use.

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());
+
+      return input
+          .apply(Create.of(this))
+          .apply(ParDo.of(new SplitFn()))

Review comment:
       Move the Split as the first step of the `ReadAll` expansion so non advanced users (those who do not specify `RingRange` manually could get their code 'partitioned' correctly.

##########
File path: sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
##########
@@ -317,9 +302,70 @@ public void testRead() throws Exception {
     PAssert.that(mapped.apply("Count occurrences per scientist", Count.perKey()))
         .satisfies(
             input -> {
+              int count = 0;
               for (KV<String, Long> element : input) {
+                count++;
                 assertEquals(element.getKey(), NUM_ROWS / 10, element.getValue().longValue());
               }
+              assertEquals(11, count);
+              return null;
+            });
+
+    pipeline.run();
+  }
+
+  CassandraIO.Read<Scientist> getReadWithRingRange(RingRange... rr) {

Review comment:
       private

##########
File path: sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
##########
@@ -191,39 +186,44 @@ private static void insertData() throws Exception {
     LOG.info("Create Cassandra tables");
     session.execute(
         String.format(
-            "CREATE TABLE IF NOT EXISTS %s.%s(person_id int, person_name text, PRIMARY KEY"
-                + "(person_id));",
+            "CREATE TABLE IF NOT EXISTS %s.%s(person_department text, person_id int, person_name text, PRIMARY KEY"

Review comment:
       nit: For the tests can we assure that every reference to the tables and Scientist object usage follows this order: id, name, department.
   

##########
File path: sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
##########
@@ -480,66 +527,22 @@ public void testCustomMapperImplDelete() {
     assertEquals(1, counter.intValue());
   }
 
-  @Test
-  public void testSplit() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    CassandraIO.Read<Scientist> read =
-        CassandraIO.<Scientist>read()
-            .withHosts(Collections.singletonList(CASSANDRA_HOST))
-            .withPort(cassandraPort)
-            .withKeyspace(CASSANDRA_KEYSPACE)
-            .withTable(CASSANDRA_TABLE)
-            .withEntity(Scientist.class)
-            .withCoder(SerializableCoder.of(Scientist.class));
-
-    // initialSource will be read without splitting (which does not happen in production)
-    // so we need to provide splitQueries to avoid NPE in source.reader.start()
-    String splitQuery = QueryBuilder.select().from(CASSANDRA_KEYSPACE, CASSANDRA_TABLE).toString();
-    CassandraIO.CassandraSource<Scientist> initialSource =
-        new CassandraIO.CassandraSource<>(read, Collections.singletonList(splitQuery));
-    int desiredBundleSizeBytes = 2048;
-    long estimatedSize = initialSource.getEstimatedSizeBytes(options);
-    List<BoundedSource<Scientist>> splits = initialSource.split(desiredBundleSizeBytes, options);
-    SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
-    float expectedNumSplitsloat =
-        (float) initialSource.getEstimatedSizeBytes(options) / desiredBundleSizeBytes;
-    long sum = 0;
-
-    for (BoundedSource<Scientist> subSource : splits) {
-      sum += subSource.getEstimatedSizeBytes(options);
-    }
-
-    // due to division and cast estimateSize != sum but will be close. Exact equals checked below
-    assertEquals((long) (estimatedSize / splits.size()) * splits.size(), sum);
-
-    int expectedNumSplits = (int) Math.ceil(expectedNumSplitsloat);
-    assertEquals("Wrong number of splits", expectedNumSplits, splits.size());
-    int emptySplits = 0;
-    for (BoundedSource<Scientist> subSource : splits) {
-      if (readFromSource(subSource, options).isEmpty()) {
-        emptySplits += 1;
-      }
-    }
-    assertThat(
-        "There are too many empty splits, parallelism is sub-optimal",
-        emptySplits,
-        lessThan((int) (ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE * splits.size())));
-  }
-
   private List<Row> getRows(String table) {
     ResultSet result =
         session.execute(
             String.format("select person_id,person_name from %s.%s", CASSANDRA_KEYSPACE, table));
     return result.all();
   }
 
+  // TEMP TEST

Review comment:
       why TEMP ?

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());
+
+      return input
+          .apply(Create.of(this))
+          .apply(ParDo.of(new SplitFn()))
+          .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+          // .apply(Reshuffle.viaRandomKey())

Review comment:
       Remove

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());

Review comment:
       move down into the apply, it makes the code more readable.

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Token;
+import java.util.Iterator;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.cassandra.CassandraIO.Read;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ReadFn<T> extends DoFn<Read<T>, T> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFn.class);
+
+  private transient Cluster cluster;
+
+  private transient Session session;
+
+  private transient Read<T> lastRead;
+
+  @Teardown
+  public void teardown() {

Review comment:
       nit: move below we tend to preserve the lifecycle order of methods setup-startbundle-processElement-finishbundle-teardown

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/RingRange.java
##########
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.sdk.io.cassandra;
 
+import com.datastax.driver.core.Metadata;
+import java.io.Serializable;
 import java.math.BigInteger;
+import java.nio.ByteBuffer;
 
 /** Models a Cassandra token range. */
-final class RingRange {
+public final class RingRange implements Serializable {

Review comment:
       Since it seems the new split implementation does not use `isWrapping()` please remove that method from the `RingRange` class

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());
+
+      return input
+          .apply(Create.of(this))
+          .apply(ParDo.of(new SplitFn()))
+          .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+          // .apply(Reshuffle.viaRandomKey())
+          .apply(readAll);
+    }
+
+    private class SplitFn extends DoFn<Read<T>, Read<T>> {
+
+      @ProcessElement
+      public void process(
+          @Element CassandraIO.Read<T> read, OutputReceiver<Read<T>> outputReceiver) {
+
+        try (Cluster cluster =
+            getCluster(
+                read.hosts(),
+                read.port(),
+                read.username(),
+                read.password(),
+                read.localDc(),
+                read.consistencyLevel())) {
+          if (isMurmur3Partitioner(cluster)) {
+            LOG.info("Murmur3Partitioner detected, splitting");
+
+            List<BigInteger> tokens =
+                cluster.getMetadata().getTokenRanges().stream()

Review comment:
       Since we need `Metadata` below and `getMetadata()` makes a synchronized operation maybe obtain `Metadata` once and reuse after.

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());
+
+      return input
+          .apply(Create.of(this))
+          .apply(ParDo.of(new SplitFn()))
+          .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+          // .apply(Reshuffle.viaRandomKey())
+          .apply(readAll);
+    }
+
+    private class SplitFn extends DoFn<Read<T>, Read<T>> {
+
+      @ProcessElement
+      public void process(
+          @Element CassandraIO.Read<T> read, OutputReceiver<Read<T>> outputReceiver) {
+
+        try (Cluster cluster =
+            getCluster(
+                read.hosts(),
+                read.port(),
+                read.username(),
+                read.password(),
+                read.localDc(),
+                read.consistencyLevel())) {
+          if (isMurmur3Partitioner(cluster)) {
+            LOG.info("Murmur3Partitioner detected, splitting");
+
+            List<BigInteger> tokens =
+                cluster.getMetadata().getTokenRanges().stream()
+                    .map(tokenRange -> new BigInteger(tokenRange.getEnd().getValue().toString()))
+                    .collect(Collectors.toList());
+            Integer splitCount = cluster.getMetadata().getAllHosts().size();
+            if (read.minNumberOfSplits() != null && read.minNumberOfSplits().get() != null) {
+              splitCount = read.minNumberOfSplits().get();
+            }
+
+            SplitGenerator splitGenerator =
+                new SplitGenerator(cluster.getMetadata().getPartitioner());
+            splitGenerator
+                .generateSplits(splitCount, tokens)
+                .forEach(
+                    rr ->
+                        outputReceiver.output(
+                            CassandraIO.<T>read()
+                                .withRingRanges(new HashSet<>(rr))
+                                .withCoder(coder())
+                                .withConsistencyLevel(consistencyLevel())
+                                .withEntity(entity())
+                                .withHosts(hosts())
+                                .withKeyspace(keyspace())
+                                .withLocalDc(localDc())
+                                .withPort(port())
+                                .withPassword(password())
+                                .withQuery(query())
+                                .withTable(table())
+                                .withUsername(username())
+                                .withMapperFactoryFn(mapperFactoryFn())));
+          } else {
+            LOG.warn(
+                "Only Murmur3Partitioner is supported for splitting, using an unique source for "
+                    + "the read");
+            String partitioner = cluster.getMetadata().getPartitioner();
+            RingRange totalRingRange =
+                new RingRange(
+                    SplitGenerator.getRangeMin(partitioner),
+                    SplitGenerator.getRangeMax(partitioner));
+            outputReceiver.output(
+                CassandraIO.<T>read()

Review comment:
       same as above the output is the initial `read` with the modified `RingRange` otherwise you would need to copy/define all attributes.

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1170,4 +898,44 @@ private void waitForFuturesToFinish() throws ExecutionException, InterruptedExce
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to read data from Apache Cassandra. See {@link CassandraIO} for more
+   * information on usage and configuration.
+   */
+  @AutoValue
+  public abstract static class ReadAll<T> extends PTransform<PCollection<Read<T>>, PCollection<T>> {
+
+    @Nullable
+    abstract Coder<T> coder();
+
+    abstract ReadAll.Builder<T> builder();
+
+    /** Specify the {@link Coder} used to serialize the entity in the {@link PCollection}. */
+    public ReadAll<T> withCoder(Coder<T> coder) {
+      checkArgument(coder != null, "coder can not be null");
+      return builder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<Read<T>> input) {
+      checkArgument(coder() != null, "withCoder() is required");
+      return input
+          .apply("shuffle", Reshuffle.viaRandomKey())

Review comment:
       "Reshuffle"

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1170,4 +898,44 @@ private void waitForFuturesToFinish() throws ExecutionException, InterruptedExce
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to read data from Apache Cassandra. See {@link CassandraIO} for more
+   * information on usage and configuration.
+   */
+  @AutoValue
+  public abstract static class ReadAll<T> extends PTransform<PCollection<Read<T>>, PCollection<T>> {
+
+    @Nullable
+    abstract Coder<T> coder();
+
+    abstract ReadAll.Builder<T> builder();
+
+    /** Specify the {@link Coder} used to serialize the entity in the {@link PCollection}. */
+    public ReadAll<T> withCoder(Coder<T> coder) {
+      checkArgument(coder != null, "coder can not be null");
+      return builder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<Read<T>> input) {
+      checkArgument(coder() != null, "withCoder() is required");
+      return input
+          .apply("shuffle", Reshuffle.viaRandomKey())
+          .apply("read", ParDo.of(new ReadFn<>()))
+          .setCoder(this.coder());

Review comment:
       "Read"

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/RingRange.java
##########
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.sdk.io.cassandra;
 
+import com.datastax.driver.core.Metadata;
+import java.io.Serializable;
 import java.math.BigInteger;
+import java.nio.ByteBuffer;
 
 /** Models a Cassandra token range. */
-final class RingRange {
+public final class RingRange implements Serializable {

Review comment:
       :+1: for making it public now!

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -370,384 +488,16 @@ private CassandraIO() {}
         return autoBuild();
       }
     }
-  }
-
-  @VisibleForTesting
-  static class CassandraSource<T> extends BoundedSource<T> {
-    final Read<T> spec;
-    final List<String> splitQueries;
-    // split source ached size - can't be calculated when already split
-    Long estimatedSize;
-    private static final String MURMUR3PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner";
-
-    CassandraSource(Read<T> spec, List<String> splitQueries) {
-      this(spec, splitQueries, null);
-    }
-
-    private CassandraSource(Read<T> spec, List<String> splitQueries, Long estimatedSize) {
-      this.estimatedSize = estimatedSize;
-      this.spec = spec;
-      this.splitQueries = splitQueries;
-    }
-
-    @Override
-    public Coder<T> getOutputCoder() {
-      return spec.coder();
-    }
-
-    @Override
-    public BoundedReader<T> createReader(PipelineOptions pipelineOptions) {
-      return new CassandraReader(this);
-    }
-
-    @Override
-    public List<BoundedSource<T>> split(
-        long desiredBundleSizeBytes, PipelineOptions pipelineOptions) {
-      try (Cluster cluster =
-          getCluster(
-              spec.hosts(),
-              spec.port(),
-              spec.username(),
-              spec.password(),
-              spec.localDc(),
-              spec.consistencyLevel())) {
-        if (isMurmur3Partitioner(cluster)) {
-          LOG.info("Murmur3Partitioner detected, splitting");
-          return splitWithTokenRanges(
-              spec, desiredBundleSizeBytes, getEstimatedSizeBytes(pipelineOptions), cluster);
-        } else {
-          LOG.warn(
-              "Only Murmur3Partitioner is supported for splitting, using a unique source for "
-                  + "the read");
-          return Collections.singletonList(
-              new CassandraIO.CassandraSource<>(spec, Collections.singletonList(buildQuery(spec))));
-        }
-      }
-    }
-
-    private static String buildQuery(Read spec) {
-      return (spec.query() == null)
-          ? String.format("SELECT * FROM %s.%s", spec.keyspace().get(), spec.table().get())
-          : spec.query().get().toString();
-    }
-
-    /**
-     * Compute the number of splits based on the estimated size and the desired bundle size, and
-     * create several sources.
-     */
-    private List<BoundedSource<T>> splitWithTokenRanges(
-        CassandraIO.Read<T> spec,
-        long desiredBundleSizeBytes,
-        long estimatedSizeBytes,
-        Cluster cluster) {
-      long numSplits =
-          getNumSplits(desiredBundleSizeBytes, estimatedSizeBytes, spec.minNumberOfSplits());
-      LOG.info("Number of desired splits is {}", numSplits);
-
-      SplitGenerator splitGenerator = new SplitGenerator(cluster.getMetadata().getPartitioner());
-      List<BigInteger> tokens =
-          cluster.getMetadata().getTokenRanges().stream()
-              .map(tokenRange -> new BigInteger(tokenRange.getEnd().getValue().toString()))
-              .collect(Collectors.toList());
-      List<List<RingRange>> splits = splitGenerator.generateSplits(numSplits, tokens);
-      LOG.info("{} splits were actually generated", splits.size());
-
-      final String partitionKey =
-          cluster.getMetadata().getKeyspace(spec.keyspace().get()).getTable(spec.table().get())
-              .getPartitionKey().stream()
-              .map(ColumnMetadata::getName)
-              .collect(Collectors.joining(","));
-
-      List<TokenRange> tokenRanges =
-          getTokenRanges(cluster, spec.keyspace().get(), spec.table().get());
-      final long estimatedSize = getEstimatedSizeBytesFromTokenRanges(tokenRanges) / splits.size();
-
-      List<BoundedSource<T>> sources = new ArrayList<>();
-      for (List<RingRange> split : splits) {
-        List<String> queries = new ArrayList<>();
-        for (RingRange range : split) {
-          if (range.isWrapping()) {

Review comment:
       Do you think this logic is already covered by the new implementation or that we cannot end up having issues on wrapping ranges e.g. repeated data. I can barely remember why we went into these 'hacks' I expected this to be a responsability of `SplitGenerator`?

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());
+
+      return input
+          .apply(Create.of(this))
+          .apply(ParDo.of(new SplitFn()))
+          .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+          // .apply(Reshuffle.viaRandomKey())
+          .apply(readAll);
+    }
+
+    private class SplitFn extends DoFn<Read<T>, Read<T>> {
+
+      @ProcessElement
+      public void process(
+          @Element CassandraIO.Read<T> read, OutputReceiver<Read<T>> outputReceiver) {
+
+        try (Cluster cluster =
+            getCluster(
+                read.hosts(),
+                read.port(),
+                read.username(),
+                read.password(),
+                read.localDc(),
+                read.consistencyLevel())) {
+          if (isMurmur3Partitioner(cluster)) {
+            LOG.info("Murmur3Partitioner detected, splitting");
+
+            List<BigInteger> tokens =
+                cluster.getMetadata().getTokenRanges().stream()
+                    .map(tokenRange -> new BigInteger(tokenRange.getEnd().getValue().toString()))
+                    .collect(Collectors.toList());
+            Integer splitCount = cluster.getMetadata().getAllHosts().size();
+            if (read.minNumberOfSplits() != null && read.minNumberOfSplits().get() != null) {
+              splitCount = read.minNumberOfSplits().get();
+            }
+
+            SplitGenerator splitGenerator =
+                new SplitGenerator(cluster.getMetadata().getPartitioner());
+            splitGenerator
+                .generateSplits(splitCount, tokens)
+                .forEach(
+                    rr ->
+                        outputReceiver.output(
+                            CassandraIO.<T>read()

Review comment:
       You do not need to rebuild the object the current output here should be simply `read.withRingRanges(new HashSet<>(rr));`

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/RingRange.java
##########
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.sdk.io.cassandra;
 
+import com.datastax.driver.core.Metadata;
+import java.io.Serializable;
 import java.math.BigInteger;
+import java.nio.ByteBuffer;
 
 /** Models a Cassandra token range. */
-final class RingRange {
+public final class RingRange implements Serializable {

Review comment:
       We should add the `equals()` and `hashCode()` methods, those are now mandatory for the `Set` contract used in `CassandraIO` to be consistent.

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());
+
+      return input
+          .apply(Create.of(this))
+          .apply(ParDo.of(new SplitFn()))

Review comment:
       Also add the `"Split",` name to the step

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());
+
+      return input
+          .apply(Create.of(this))
+          .apply(ParDo.of(new SplitFn()))
+          .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+          // .apply(Reshuffle.viaRandomKey())
+          .apply(readAll);
+    }
+
+    private class SplitFn extends DoFn<Read<T>, Read<T>> {
+
+      @ProcessElement
+      public void process(
+          @Element CassandraIO.Read<T> read, OutputReceiver<Read<T>> outputReceiver) {
+
+        try (Cluster cluster =
+            getCluster(
+                read.hosts(),
+                read.port(),
+                read.username(),
+                read.password(),
+                read.localDc(),
+                read.consistencyLevel())) {
+          if (isMurmur3Partitioner(cluster)) {
+            LOG.info("Murmur3Partitioner detected, splitting");
+
+            List<BigInteger> tokens =
+                cluster.getMetadata().getTokenRanges().stream()
+                    .map(tokenRange -> new BigInteger(tokenRange.getEnd().getValue().toString()))
+                    .collect(Collectors.toList());
+            Integer splitCount = cluster.getMetadata().getAllHosts().size();

Review comment:
       Better define `Integer splitCount = read.minNumberOfSplits().get();` first, this will allow you to skip one server visit if it is already set up by the user.

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -326,7 +371,78 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.<T>readAll().withCoder(this.coder());
+
+      return input
+          .apply(Create.of(this))
+          .apply(ParDo.of(new SplitFn()))
+          .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+          // .apply(Reshuffle.viaRandomKey())
+          .apply(readAll);
+    }
+
+    private class SplitFn extends DoFn<Read<T>, Read<T>> {
+
+      @ProcessElement
+      public void process(
+          @Element CassandraIO.Read<T> read, OutputReceiver<Read<T>> outputReceiver) {
+
+        try (Cluster cluster =
+            getCluster(
+                read.hosts(),
+                read.port(),
+                read.username(),
+                read.password(),
+                read.localDc(),
+                read.consistencyLevel())) {
+          if (isMurmur3Partitioner(cluster)) {

Review comment:
       I think we are missing here the case when the user already defines his `RingRange`  in the input read, what should we do in that case? my intutition tells me we should just pass the read as an output without changes, but if we want to split his decision we should have the `RingRange` input in account somehow.

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1170,4 +898,44 @@ private void waitForFuturesToFinish() throws ExecutionException, InterruptedExce
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to read data from Apache Cassandra. See {@link CassandraIO} for more
+   * information on usage and configuration.
+   */
+  @AutoValue
+  public abstract static class ReadAll<T> extends PTransform<PCollection<Read<T>>, PCollection<T>> {
+
+    @Nullable
+    abstract Coder<T> coder();
+
+    abstract ReadAll.Builder<T> builder();
+
+    /** Specify the {@link Coder} used to serialize the entity in the {@link PCollection}. */
+    public ReadAll<T> withCoder(Coder<T> coder) {
+      checkArgument(coder != null, "coder can not be null");
+      return builder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<Read<T>> input) {
+      checkArgument(coder() != null, "withCoder() is required");
+      return input
+          .apply("shuffle", Reshuffle.viaRandomKey())
+          .apply("read", ParDo.of(new ReadFn<>()))
+          .setCoder(this.coder());

Review comment:
       coder should come from the input read

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/RingRange.java
##########
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.sdk.io.cassandra;
 
+import com.datastax.driver.core.Metadata;
+import java.io.Serializable;
 import java.math.BigInteger;
+import java.nio.ByteBuffer;
 
 /** Models a Cassandra token range. */
-final class RingRange {
+public final class RingRange implements Serializable {

Review comment:
       Please also add `@Experimental(Kind.SOURCE_SINK)` to the class.

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1170,4 +898,44 @@ private void waitForFuturesToFinish() throws ExecutionException, InterruptedExce
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to read data from Apache Cassandra. See {@link CassandraIO} for more
+   * information on usage and configuration.
+   */
+  @AutoValue
+  public abstract static class ReadAll<T> extends PTransform<PCollection<Read<T>>, PCollection<T>> {
+
+    @Nullable
+    abstract Coder<T> coder();
+
+    abstract ReadAll.Builder<T> builder();
+
+    /** Specify the {@link Coder} used to serialize the entity in the {@link PCollection}. */
+    public ReadAll<T> withCoder(Coder<T> coder) {

Review comment:
       I am not sure if we need this, I assume the coder comes form the `Read<T> input.

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -370,384 +488,16 @@ private CassandraIO() {}
         return autoBuild();
       }
     }
-  }
-
-  @VisibleForTesting
-  static class CassandraSource<T> extends BoundedSource<T> {
-    final Read<T> spec;
-    final List<String> splitQueries;
-    // split source ached size - can't be calculated when already split
-    Long estimatedSize;
-    private static final String MURMUR3PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner";
-
-    CassandraSource(Read<T> spec, List<String> splitQueries) {
-      this(spec, splitQueries, null);
-    }
-
-    private CassandraSource(Read<T> spec, List<String> splitQueries, Long estimatedSize) {
-      this.estimatedSize = estimatedSize;
-      this.spec = spec;
-      this.splitQueries = splitQueries;
-    }
-
-    @Override
-    public Coder<T> getOutputCoder() {
-      return spec.coder();
-    }
-
-    @Override
-    public BoundedReader<T> createReader(PipelineOptions pipelineOptions) {
-      return new CassandraReader(this);
-    }
-
-    @Override
-    public List<BoundedSource<T>> split(
-        long desiredBundleSizeBytes, PipelineOptions pipelineOptions) {
-      try (Cluster cluster =
-          getCluster(
-              spec.hosts(),
-              spec.port(),
-              spec.username(),
-              spec.password(),
-              spec.localDc(),
-              spec.consistencyLevel())) {
-        if (isMurmur3Partitioner(cluster)) {
-          LOG.info("Murmur3Partitioner detected, splitting");
-          return splitWithTokenRanges(
-              spec, desiredBundleSizeBytes, getEstimatedSizeBytes(pipelineOptions), cluster);
-        } else {
-          LOG.warn(
-              "Only Murmur3Partitioner is supported for splitting, using a unique source for "
-                  + "the read");
-          return Collections.singletonList(
-              new CassandraIO.CassandraSource<>(spec, Collections.singletonList(buildQuery(spec))));
-        }
-      }
-    }
-
-    private static String buildQuery(Read spec) {
-      return (spec.query() == null)
-          ? String.format("SELECT * FROM %s.%s", spec.keyspace().get(), spec.table().get())
-          : spec.query().get().toString();
-    }
-
-    /**
-     * Compute the number of splits based on the estimated size and the desired bundle size, and
-     * create several sources.
-     */
-    private List<BoundedSource<T>> splitWithTokenRanges(
-        CassandraIO.Read<T> spec,
-        long desiredBundleSizeBytes,
-        long estimatedSizeBytes,
-        Cluster cluster) {
-      long numSplits =
-          getNumSplits(desiredBundleSizeBytes, estimatedSizeBytes, spec.minNumberOfSplits());
-      LOG.info("Number of desired splits is {}", numSplits);
-
-      SplitGenerator splitGenerator = new SplitGenerator(cluster.getMetadata().getPartitioner());
-      List<BigInteger> tokens =
-          cluster.getMetadata().getTokenRanges().stream()
-              .map(tokenRange -> new BigInteger(tokenRange.getEnd().getValue().toString()))
-              .collect(Collectors.toList());
-      List<List<RingRange>> splits = splitGenerator.generateSplits(numSplits, tokens);
-      LOG.info("{} splits were actually generated", splits.size());
-
-      final String partitionKey =
-          cluster.getMetadata().getKeyspace(spec.keyspace().get()).getTable(spec.table().get())
-              .getPartitionKey().stream()
-              .map(ColumnMetadata::getName)
-              .collect(Collectors.joining(","));
-
-      List<TokenRange> tokenRanges =
-          getTokenRanges(cluster, spec.keyspace().get(), spec.table().get());
-      final long estimatedSize = getEstimatedSizeBytesFromTokenRanges(tokenRanges) / splits.size();
-
-      List<BoundedSource<T>> sources = new ArrayList<>();
-      for (List<RingRange> split : splits) {
-        List<String> queries = new ArrayList<>();
-        for (RingRange range : split) {
-          if (range.isWrapping()) {
-            // A wrapping range is one that overlaps from the end of the partitioner range and its
-            // start (ie : when the start token of the split is greater than the end token)
-            // We need to generate two queries here : one that goes from the start token to the end
-            // of
-            // the partitioner range, and the other from the start of the partitioner range to the
-            // end token of the split.
-            queries.add(generateRangeQuery(spec, partitionKey, range.getStart(), null));
-            // Generation of the second query of the wrapping range
-            queries.add(generateRangeQuery(spec, partitionKey, null, range.getEnd()));
-          } else {
-            queries.add(generateRangeQuery(spec, partitionKey, range.getStart(), range.getEnd()));
-          }
-        }
-        sources.add(new CassandraIO.CassandraSource<>(spec, queries, estimatedSize));
-      }
-      return sources;
-    }
-
-    private static String generateRangeQuery(
-        Read spec, String partitionKey, BigInteger rangeStart, BigInteger rangeEnd) {
-      final String rangeFilter =
-          Joiner.on(" AND ")
-              .skipNulls()
-              .join(
-                  rangeStart == null
-                      ? null
-                      : String.format("(token(%s) >= %d)", partitionKey, rangeStart),
-                  rangeEnd == null
-                      ? null
-                      : String.format("(token(%s) < %d)", partitionKey, rangeEnd));
-      final String query =
-          (spec.query() == null)
-              ? buildQuery(spec) + " WHERE " + rangeFilter
-              : buildQuery(spec) + " AND " + rangeFilter;
-      LOG.debug("CassandraIO generated query : {}", query);
-      return query;
-    }
-
-    private static long getNumSplits(
-        long desiredBundleSizeBytes,
-        long estimatedSizeBytes,
-        @Nullable ValueProvider<Integer> minNumberOfSplits) {
-      long numSplits =
-          desiredBundleSizeBytes > 0 ? (estimatedSizeBytes / desiredBundleSizeBytes) : 1;
-      if (numSplits <= 0) {
-        LOG.warn("Number of splits is less than 0 ({}), fallback to 1", numSplits);
-        numSplits = 1;
-      }
-      return minNumberOfSplits != null ? Math.max(numSplits, minNumberOfSplits.get()) : numSplits;
-    }
-
-    /**
-     * Returns cached estimate for split or if missing calculate size for whole table. Highly
-     * innacurate if query is specified.
-     *
-     * @param pipelineOptions
-     * @return
-     */
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
-      if (estimatedSize != null) {
-        return estimatedSize;
-      } else {
-        try (Cluster cluster =
-            getCluster(
-                spec.hosts(),
-                spec.port(),
-                spec.username(),
-                spec.password(),
-                spec.localDc(),
-                spec.consistencyLevel())) {
-          if (isMurmur3Partitioner(cluster)) {
-            try {
-              List<TokenRange> tokenRanges =
-                  getTokenRanges(cluster, spec.keyspace().get(), spec.table().get());
-              this.estimatedSize = getEstimatedSizeBytesFromTokenRanges(tokenRanges);
-              return this.estimatedSize;
-            } catch (Exception e) {
-              LOG.warn("Can't estimate the size", e);
-              return 0L;
-            }
-          } else {
-            LOG.warn("Only Murmur3 partitioner is supported, can't estimate the size");
-            return 0L;
-          }
-        }
-      }
-    }
-
-    @VisibleForTesting
-    static long getEstimatedSizeBytesFromTokenRanges(List<TokenRange> tokenRanges) {
-      long size = 0L;
-      for (TokenRange tokenRange : tokenRanges) {
-        size = size + tokenRange.meanPartitionSize * tokenRange.partitionCount;
-      }
-      return Math.round(size / getRingFraction(tokenRanges));
-    }
 
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      if (spec.hosts() != null) {
-        builder.add(DisplayData.item("hosts", spec.hosts().toString()));
-      }
-      if (spec.port() != null) {
-        builder.add(DisplayData.item("port", spec.port()));
-      }
-      builder.addIfNotNull(DisplayData.item("keyspace", spec.keyspace()));
-      builder.addIfNotNull(DisplayData.item("table", spec.table()));
-      builder.addIfNotNull(DisplayData.item("username", spec.username()));
-      builder.addIfNotNull(DisplayData.item("localDc", spec.localDc()));
-      builder.addIfNotNull(DisplayData.item("consistencyLevel", spec.consistencyLevel()));
-    }
     // ------------- CASSANDRA SOURCE UTIL METHODS ---------------//
 
-    /**
-     * Gets the list of token ranges that a table occupies on a give Cassandra node.
-     *
-     * <p>NB: This method is compatible with Cassandra 2.1.5 and greater.
-     */
-    private static List<TokenRange> getTokenRanges(Cluster cluster, String keyspace, String table) {
-      try (Session session = cluster.newSession()) {
-        ResultSet resultSet =
-            session.execute(
-                "SELECT range_start, range_end, partitions_count, mean_partition_size FROM "
-                    + "system.size_estimates WHERE keyspace_name = ? AND table_name = ?",
-                keyspace,
-                table);
-
-        ArrayList<TokenRange> tokenRanges = new ArrayList<>();
-        for (Row row : resultSet) {
-          TokenRange tokenRange =
-              new TokenRange(
-                  row.getLong("partitions_count"),
-                  row.getLong("mean_partition_size"),
-                  new BigInteger(row.getString("range_start")),
-                  new BigInteger(row.getString("range_end")));
-          tokenRanges.add(tokenRange);
-        }
-        // The table may not contain the estimates yet
-        // or have partitions_count and mean_partition_size fields = 0
-        // if the data was just inserted and the amount of data in the table was small.
-        // This is very common situation during tests,
-        // when we insert a few rows and immediately query them.
-        // However, for tiny data sets the lack of size estimates is not a problem at all,
-        // because we don't want to split tiny data anyways.
-        // Therefore, we're not issuing a warning if the result set was empty
-        // or mean_partition_size and partitions_count = 0.
-        return tokenRanges;
-      }
-    }
-
-    /** Compute the percentage of token addressed compared with the whole tokens in the cluster. */
-    @VisibleForTesting
-    static double getRingFraction(List<TokenRange> tokenRanges) {
-      double ringFraction = 0;
-      for (TokenRange tokenRange : tokenRanges) {
-        ringFraction =
-            ringFraction
-                + (distance(tokenRange.rangeStart, tokenRange.rangeEnd).doubleValue()
-                    / SplitGenerator.getRangeSize(MURMUR3PARTITIONER).doubleValue());
-      }
-      return ringFraction;
-    }
-
-    /**
-     * Check if the current partitioner is the Murmur3 (default in Cassandra version newer than 2).
-     */
-    @VisibleForTesting
-    static boolean isMurmur3Partitioner(Cluster cluster) {
-      return MURMUR3PARTITIONER.equals(cluster.getMetadata().getPartitioner());
-    }
-
     /** Measure distance between two tokens. */
     @VisibleForTesting
     static BigInteger distance(BigInteger left, BigInteger right) {
       return (right.compareTo(left) > 0)
           ? right.subtract(left)
           : right.subtract(left).add(SplitGenerator.getRangeSize(MURMUR3PARTITIONER));
     }
-
-    /**
-     * Represent a token range in Cassandra instance, wrapping the partition count, size and token
-     * range.
-     */
-    @VisibleForTesting
-    static class TokenRange {

Review comment:
       :clap: remove repeated and useless :clap: 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org