You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/10/13 21:50:00 UTC

[jira] [Work logged] (BEAM-9008) Add readAll() method to CassandraIO

     [ https://issues.apache.org/jira/browse/BEAM-9008?focusedWorklogId=500304&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-500304 ]

ASF GitHub Bot logged work on BEAM-9008:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Oct/20 21:49
            Start Date: 13/Oct/20 21:49
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on a change in pull request #10546:
URL: https://github.com/apache/beam/pull/10546#discussion_r503788939



##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -128,11 +126,18 @@
 
   private CassandraIO() {}
 
+  private static final String MURMUR3PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner";

Review comment:
       Please move this one inside of `SplitFn`.
   You would probably need some hack like:
   ```
   PCollection<Read<T>> split = (PCollection<Read<T>>) input.apply("Split", ParDo.of(new SplitFn()));
   return split
         .apply("Reshuffle", Reshuffle.viaRandomKey())
         .apply("Read", ParDo.of(new ReadFn<>()));
   ```

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -373,7 +419,86 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.readAll();
+
+      return input
+          .apply(Create.of(this))
+          .apply("Split", ParDo.of(new SplitFn()))
+          .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+          .apply("ReadAll", readAll.withCoder(this.coder()));
+    }
+
+    private class SplitFn extends DoFn<Read<T>, Read<T>> {
+
+      @ProcessElement
+      public void process(
+          @Element CassandraIO.Read<T> read, OutputReceiver<Read<T>> outputReceiver) {
+        getRingRanges(read)
+            .forEach(
+                rr ->
+                    outputReceiver.output(
+                        CassandraIO.<T>read()
+                            .withRingRanges(new HashSet<>(rr))
+                            .withCoder(coder())
+                            .withConsistencyLevel(consistencyLevel())
+                            .withEntity(entity())
+                            .withHosts(hosts())
+                            .withKeyspace(keyspace())
+                            .withLocalDc(localDc())
+                            .withPort(port())
+                            .withPassword(password())
+                            .withQuery(query())
+                            .withTable(table())
+                            .withUsername(username())
+                            .withMapperFactoryFn(mapperFactoryFn())));
+      }
+
+      Stream<Set<RingRange>> getRingRanges(Read<T> read) {

Review comment:
       return a Set (see comment above)

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -143,6 +148,36 @@ private CassandraIO() {}
     return Write.<T>builder(MutationType.DELETE).build();
   }
 
+  /** Get a Cassandra cluster using hosts and port. */
+  static Cluster getCluster(

Review comment:
       Why we have a different getCluster method than the one for the Read, also why were socket options removed there? I think we can maybe move this one outside to a sort of CassandraUtils package, but that's optional, however the use of the same method should not be changed by this PR if there is not a strong reason to do so.

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Token;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.cassandra.CassandraIO.Read;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ReadFn<T> extends DoFn<Read<T>, T> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFn.class);
+
+  private transient Cluster cluster;
+
+  private transient Session session;
+
+  private transient Read<T> lastRead;
+
+  @ProcessElement
+  public void processElement(@Element Read<T> read, OutputReceiver<T> receiver) {
+    Session session = getSession(read);
+    Mapper<T> mapper = read.mapperFactoryFn().apply(session);
+    String partitionKey =
+        cluster.getMetadata().getKeyspace(read.keyspace().get()).getTable(read.table().get())
+            .getPartitionKey().stream()
+            .map(ColumnMetadata::getName)
+            .collect(Collectors.joining(","));
+
+    String query = generateRangeQuery(read, partitionKey, read.ringRanges() != null);
+    PreparedStatement preparedStatement = session.prepare(query);
+    Set<RingRange> ringRanges =
+        read.ringRanges() == null ? Collections.<RingRange>emptySet() : read.ringRanges().get();
+
+    for (RingRange rr : ringRanges) {
+      Token startToken = cluster.getMetadata().newToken(rr.getStart().toString());
+      Token endToken = cluster.getMetadata().newToken(rr.getEnd().toString());
+      ResultSet rs =
+          session.execute(preparedStatement.bind().setToken(0, startToken).setToken(1, endToken));
+      Iterator<T> iter = mapper.map(rs);
+      while (iter.hasNext()) {
+        T n = iter.next();
+        receiver.output(n);
+      }
+    }
+
+    if (read.ringRanges() == null) {
+      ResultSet rs = session.execute(preparedStatement.bind());
+      Iterator<T> iter = mapper.map(rs);
+      while (iter.hasNext()) {
+        receiver.output(iter.next());
+      }
+    }
+  }
+
+  @Teardown
+  public void teardown() {
+    if (session != null) {
+      this.session.close();
+    }
+    if (cluster != null) {
+      this.cluster.close();
+    }
+  }
+
+  private Session getSession(Read<T> read) {
+    if (cluster == null || !reuseCluster(this.lastRead, read)) {
+      this.cluster =
+          CassandraIO.getCluster(
+              read.hosts(),
+              read.port(),
+              read.username(),
+              read.password(),
+              read.localDc(),
+              read.consistencyLevel());
+    }
+    if (session == null || !reuseSession(lastRead, read)) {
+      this.session = this.cluster.connect(read.keyspace().get());
+    }
+    this.lastRead = read;
+    return this.session;
+  }
+
+  private static <T> boolean reuseCluster(Read<T> readA, Read<T> readB) {
+    return readA != null
+        && readA.hosts().get().equals(readB.hosts().get())
+        && readA.port().get().equals(readB.port().get())
+        && ((readA.username() != null && readA.username().equals(readB.username()))
+            || (readA.username() == null && readB.username() == null))
+        && ((readA.consistencyLevel() != null
+                && readA.consistencyLevel().equals(readB.consistencyLevel()))
+            || (readA.consistencyLevel() == null && readB.consistencyLevel() == null))
+        && ((readA.localDc() != null && readA.localDc().equals(readB.consistencyLevel()))
+            || (readA.localDc() == null && readB.localDc() == null));
+  }
+
+  // TODO: Unit test
+  private static <T> boolean reuseSession(Read<T> readA, Read<T> readB) {
+    return (readA.keyspace() != null && readA.keyspace().equals(readB.keyspace()))
+        || (readA.keyspace() == null && readB.keyspace() == null);
+  }
+
+  /*
+  private static String generateRangeQuery(Read<?> spec, String partitionKey) {

Review comment:
       Remove if unused

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -423,396 +550,20 @@ private CassandraIO() {}
     }
   }
 
-  @VisibleForTesting
-  static class CassandraSource<T> extends BoundedSource<T> {
-    final Read<T> spec;
-    final List<String> splitQueries;
-    // split source ached size - can't be calculated when already split
-    Long estimatedSize;
-    private static final String MURMUR3PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner";
-
-    CassandraSource(Read<T> spec, List<String> splitQueries) {
-      this(spec, splitQueries, null);
-    }
-
-    private CassandraSource(Read<T> spec, List<String> splitQueries, Long estimatedSize) {
-      this.estimatedSize = estimatedSize;
-      this.spec = spec;
-      this.splitQueries = splitQueries;
-    }
-
-    @Override
-    public Coder<T> getOutputCoder() {
-      return spec.coder();
-    }
-
-    @Override
-    public BoundedReader<T> createReader(PipelineOptions pipelineOptions) {
-      return new CassandraReader(this);
-    }
-
-    @Override
-    public List<BoundedSource<T>> split(
-        long desiredBundleSizeBytes, PipelineOptions pipelineOptions) {
-      try (Cluster cluster =
-          getCluster(
-              spec.hosts(),
-              spec.port(),
-              spec.username(),
-              spec.password(),
-              spec.localDc(),
-              spec.consistencyLevel(),
-              spec.connectTimeout(),
-              spec.readTimeout())) {
-        if (isMurmur3Partitioner(cluster)) {
-          LOG.info("Murmur3Partitioner detected, splitting");
-          return splitWithTokenRanges(
-              spec, desiredBundleSizeBytes, getEstimatedSizeBytes(pipelineOptions), cluster);
-        } else {
-          LOG.warn(
-              "Only Murmur3Partitioner is supported for splitting, using a unique source for "
-                  + "the read");
-          return Collections.singletonList(
-              new CassandraIO.CassandraSource<>(spec, Collections.singletonList(buildQuery(spec))));
-        }
-      }
-    }
-
-    private static String buildQuery(Read spec) {
-      return (spec.query() == null)
-          ? String.format("SELECT * FROM %s.%s", spec.keyspace().get(), spec.table().get())
-          : spec.query().get().toString();
-    }
-
-    /**
-     * Compute the number of splits based on the estimated size and the desired bundle size, and
-     * create several sources.
-     */
-    private List<BoundedSource<T>> splitWithTokenRanges(
-        CassandraIO.Read<T> spec,
-        long desiredBundleSizeBytes,
-        long estimatedSizeBytes,
-        Cluster cluster) {
-      long numSplits =
-          getNumSplits(desiredBundleSizeBytes, estimatedSizeBytes, spec.minNumberOfSplits());
-      LOG.info("Number of desired splits is {}", numSplits);
-
-      SplitGenerator splitGenerator = new SplitGenerator(cluster.getMetadata().getPartitioner());
-      List<BigInteger> tokens =
-          cluster.getMetadata().getTokenRanges().stream()
-              .map(tokenRange -> new BigInteger(tokenRange.getEnd().getValue().toString()))
-              .collect(Collectors.toList());
-      List<List<RingRange>> splits = splitGenerator.generateSplits(numSplits, tokens);
-      LOG.info("{} splits were actually generated", splits.size());
-
-      final String partitionKey =
-          cluster.getMetadata().getKeyspace(spec.keyspace().get()).getTable(spec.table().get())
-              .getPartitionKey().stream()
-              .map(ColumnMetadata::getName)
-              .collect(Collectors.joining(","));
-
-      List<TokenRange> tokenRanges =
-          getTokenRanges(cluster, spec.keyspace().get(), spec.table().get());
-      final long estimatedSize = getEstimatedSizeBytesFromTokenRanges(tokenRanges) / splits.size();
-
-      List<BoundedSource<T>> sources = new ArrayList<>();
-      for (List<RingRange> split : splits) {
-        List<String> queries = new ArrayList<>();
-        for (RingRange range : split) {
-          if (range.isWrapping()) {
-            // A wrapping range is one that overlaps from the end of the partitioner range and its
-            // start (ie : when the start token of the split is greater than the end token)
-            // We need to generate two queries here : one that goes from the start token to the end
-            // of
-            // the partitioner range, and the other from the start of the partitioner range to the
-            // end token of the split.
-            queries.add(generateRangeQuery(spec, partitionKey, range.getStart(), null));
-            // Generation of the second query of the wrapping range
-            queries.add(generateRangeQuery(spec, partitionKey, null, range.getEnd()));
-          } else {
-            queries.add(generateRangeQuery(spec, partitionKey, range.getStart(), range.getEnd()));
-          }
-        }
-        sources.add(new CassandraIO.CassandraSource<>(spec, queries, estimatedSize));
-      }
-      return sources;
-    }
-
-    private static String generateRangeQuery(
-        Read spec, String partitionKey, BigInteger rangeStart, BigInteger rangeEnd) {
-      final String rangeFilter =
-          Joiner.on(" AND ")
-              .skipNulls()
-              .join(
-                  rangeStart == null
-                      ? null
-                      : String.format("(token(%s) >= %d)", partitionKey, rangeStart),
-                  rangeEnd == null
-                      ? null
-                      : String.format("(token(%s) < %d)", partitionKey, rangeEnd));
-      final String query =
-          (spec.query() == null)
-              ? buildQuery(spec) + " WHERE " + rangeFilter
-              : buildQuery(spec) + " AND " + rangeFilter;
-      LOG.debug("CassandraIO generated query : {}", query);
-      return query;
-    }
-
-    private static long getNumSplits(
-        long desiredBundleSizeBytes,
-        long estimatedSizeBytes,
-        @Nullable ValueProvider<Integer> minNumberOfSplits) {
-      long numSplits =
-          desiredBundleSizeBytes > 0 ? (estimatedSizeBytes / desiredBundleSizeBytes) : 1;
-      if (numSplits <= 0) {
-        LOG.warn("Number of splits is less than 0 ({}), fallback to 1", numSplits);
-        numSplits = 1;
-      }
-      return minNumberOfSplits != null ? Math.max(numSplits, minNumberOfSplits.get()) : numSplits;
-    }
-
-    /**
-     * Returns cached estimate for split or if missing calculate size for whole table. Highly
-     * innacurate if query is specified.
-     *
-     * @param pipelineOptions
-     * @return
-     */
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
-      if (estimatedSize != null) {
-        return estimatedSize;
-      } else {
-        try (Cluster cluster =
-            getCluster(
-                spec.hosts(),
-                spec.port(),
-                spec.username(),
-                spec.password(),
-                spec.localDc(),
-                spec.consistencyLevel(),
-                spec.connectTimeout(),
-                spec.readTimeout())) {
-          if (isMurmur3Partitioner(cluster)) {
-            try {
-              List<TokenRange> tokenRanges =
-                  getTokenRanges(cluster, spec.keyspace().get(), spec.table().get());
-              this.estimatedSize = getEstimatedSizeBytesFromTokenRanges(tokenRanges);
-              return this.estimatedSize;
-            } catch (Exception e) {
-              LOG.warn("Can't estimate the size", e);
-              return 0L;
-            }
-          } else {
-            LOG.warn("Only Murmur3 partitioner is supported, can't estimate the size");
-            return 0L;
-          }
-        }
-      }
-    }
-
-    @VisibleForTesting
-    static long getEstimatedSizeBytesFromTokenRanges(List<TokenRange> tokenRanges) {
-      long size = 0L;
-      for (TokenRange tokenRange : tokenRanges) {
-        size = size + tokenRange.meanPartitionSize * tokenRange.partitionCount;
-      }
-      return Math.round(size / getRingFraction(tokenRanges));
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      if (spec.hosts() != null) {
-        builder.add(DisplayData.item("hosts", spec.hosts().toString()));
-      }
-      if (spec.port() != null) {
-        builder.add(DisplayData.item("port", spec.port()));
-      }
-      builder.addIfNotNull(DisplayData.item("keyspace", spec.keyspace()));
-      builder.addIfNotNull(DisplayData.item("table", spec.table()));
-      builder.addIfNotNull(DisplayData.item("username", spec.username()));
-      builder.addIfNotNull(DisplayData.item("localDc", spec.localDc()));
-      builder.addIfNotNull(DisplayData.item("consistencyLevel", spec.consistencyLevel()));
-    }
-    // ------------- CASSANDRA SOURCE UTIL METHODS ---------------//
-
-    /**
-     * Gets the list of token ranges that a table occupies on a give Cassandra node.
-     *
-     * <p>NB: This method is compatible with Cassandra 2.1.5 and greater.
-     */
-    private static List<TokenRange> getTokenRanges(Cluster cluster, String keyspace, String table) {
-      try (Session session = cluster.newSession()) {
-        ResultSet resultSet =
-            session.execute(
-                "SELECT range_start, range_end, partitions_count, mean_partition_size FROM "
-                    + "system.size_estimates WHERE keyspace_name = ? AND table_name = ?",
-                keyspace,
-                table);
-
-        ArrayList<TokenRange> tokenRanges = new ArrayList<>();
-        for (Row row : resultSet) {
-          TokenRange tokenRange =
-              new TokenRange(
-                  row.getLong("partitions_count"),
-                  row.getLong("mean_partition_size"),
-                  new BigInteger(row.getString("range_start")),
-                  new BigInteger(row.getString("range_end")));
-          tokenRanges.add(tokenRange);
-        }
-        // The table may not contain the estimates yet
-        // or have partitions_count and mean_partition_size fields = 0
-        // if the data was just inserted and the amount of data in the table was small.
-        // This is very common situation during tests,
-        // when we insert a few rows and immediately query them.
-        // However, for tiny data sets the lack of size estimates is not a problem at all,
-        // because we don't want to split tiny data anyways.
-        // Therefore, we're not issuing a warning if the result set was empty
-        // or mean_partition_size and partitions_count = 0.
-        return tokenRanges;
-      }
-    }
-
-    /** Compute the percentage of token addressed compared with the whole tokens in the cluster. */
-    @VisibleForTesting
-    static double getRingFraction(List<TokenRange> tokenRanges) {
-      double ringFraction = 0;
-      for (TokenRange tokenRange : tokenRanges) {
-        ringFraction =
-            ringFraction
-                + (distance(tokenRange.rangeStart, tokenRange.rangeEnd).doubleValue()
-                    / SplitGenerator.getRangeSize(MURMUR3PARTITIONER).doubleValue());
-      }
-      return ringFraction;
-    }
-
-    /**
-     * Check if the current partitioner is the Murmur3 (default in Cassandra version newer than 2).
-     */
-    @VisibleForTesting
-    static boolean isMurmur3Partitioner(Cluster cluster) {
-      return MURMUR3PARTITIONER.equals(cluster.getMetadata().getPartitioner());
-    }
-
-    /** Measure distance between two tokens. */
-    @VisibleForTesting
-    static BigInteger distance(BigInteger left, BigInteger right) {
-      return (right.compareTo(left) > 0)
-          ? right.subtract(left)
-          : right.subtract(left).add(SplitGenerator.getRangeSize(MURMUR3PARTITIONER));
-    }
-
-    /**
-     * Represent a token range in Cassandra instance, wrapping the partition count, size and token
-     * range.
-     */
-    @VisibleForTesting
-    static class TokenRange {
-      private final long partitionCount;
-      private final long meanPartitionSize;
-      private final BigInteger rangeStart;
-      private final BigInteger rangeEnd;
-
-      TokenRange(
-          long partitionCount, long meanPartitionSize, BigInteger rangeStart, BigInteger rangeEnd) {
-        this.partitionCount = partitionCount;
-        this.meanPartitionSize = meanPartitionSize;
-        this.rangeStart = rangeStart;
-        this.rangeEnd = rangeEnd;
-      }
-    }
-
-    private class CassandraReader extends BoundedSource.BoundedReader<T> {
-      private final CassandraIO.CassandraSource<T> source;
-      private Cluster cluster;
-      private Session session;
-      private Iterator<T> iterator;
-      private T current;
-
-      CassandraReader(CassandraSource<T> source) {
-        this.source = source;
-      }
-
-      @Override
-      public boolean start() {
-        LOG.debug("Starting Cassandra reader");
-        cluster =
-            getCluster(
-                source.spec.hosts(),
-                source.spec.port(),
-                source.spec.username(),
-                source.spec.password(),
-                source.spec.localDc(),
-                source.spec.consistencyLevel(),
-                source.spec.connectTimeout(),
-                source.spec.readTimeout());
-        session = cluster.connect(source.spec.keyspace().get());
-        LOG.debug("Queries: " + source.splitQueries);
-        List<ResultSetFuture> futures = new ArrayList<>();
-        for (String query : source.splitQueries) {
-          futures.add(session.executeAsync(query));
-        }
-
-        final Mapper<T> mapper = getMapper(session, source.spec.entity());
-
-        for (ResultSetFuture result : futures) {
-          if (iterator == null) {
-            iterator = mapper.map(result.getUninterruptibly());
-          } else {
-            iterator = Iterators.concat(iterator, mapper.map(result.getUninterruptibly()));
-          }
-        }
-
-        return advance();
-      }
-
-      @Override
-      public boolean advance() {
-        if (iterator.hasNext()) {
-          current = iterator.next();
-          return true;
-        }
-        current = null;
-        return false;
-      }
-
-      @Override
-      public void close() {
-        LOG.debug("Closing Cassandra reader");
-        if (session != null) {
-          session.close();
-        }
-        if (cluster != null) {
-          cluster.close();
-        }
-      }
-
-      @Override
-      public T getCurrent() throws NoSuchElementException {
-        if (current == null) {
-          throw new NoSuchElementException();
-        }
-        return current;
-      }
-
-      @Override
-      public CassandraIO.CassandraSource<T> getCurrentSource() {
-        return source;
-      }
-
-      private Mapper<T> getMapper(Session session, Class<T> enitity) {
-        return source.spec.mapperFactoryFn().apply(session);
-      }
-    }
-  }
-
   /** Specify the mutation type: either write or delete. */
   public enum MutationType {
     WRITE,
     DELETE
   }
 
+  /**
+   * Check if the current partitioner is the Murmur3 (default in Cassandra version newer than 2).
+   */
+  @VisibleForTesting
+  static boolean isMurmur3Partitioner(Cluster cluster) {

Review comment:
       Can we move this one into `SplitFn` too since it is not used in other places.

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -373,7 +419,86 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.readAll();
+
+      return input
+          .apply(Create.of(this))
+          .apply("Split", ParDo.of(new SplitFn()))

Review comment:
       Can you please move this Split into the ReadAll expand method. When not specified RingRanges like in the `read()` case we should split them on the `ReadAll` expansion. Of course if RingRanges are specified we will probably ignore recalculating them.

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1281,4 +1032,44 @@ private void waitForFuturesToFinish() throws ExecutionException, InterruptedExce
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to read data from Apache Cassandra. See {@link CassandraIO} for more
+   * information on usage and configuration.
+   */
+  @AutoValue
+  public abstract static class ReadAll<T> extends PTransform<PCollection<Read<T>>, PCollection<T>> {
+
+    @Nullable
+    abstract Coder<T> coder();
+
+    abstract ReadAll.Builder<T> builder();

Review comment:
       Remove `ReadAll.`

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1281,4 +1032,44 @@ private void waitForFuturesToFinish() throws ExecutionException, InterruptedExce
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to read data from Apache Cassandra. See {@link CassandraIO} for more
+   * information on usage and configuration.
+   */
+  @AutoValue
+  public abstract static class ReadAll<T> extends PTransform<PCollection<Read<T>>, PCollection<T>> {
+
+    @Nullable
+    abstract Coder<T> coder();
+
+    abstract ReadAll.Builder<T> builder();
+
+    /** Specify the {@link Coder} used to serialize the entity in the {@link PCollection}. */
+    public ReadAll<T> withCoder(Coder<T> coder) {
+      checkArgument(coder != null, "coder can not be null");
+      return builder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<Read<T>> input) {
+      checkArgument(coder() != null, "withCoder() is required");
+      return input
+          .apply("Reshuffle", Reshuffle.viaRandomKey())
+          .apply("Read", ParDo.of(new ReadFn<>()))
+          .setCoder(this.coder());
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {

Review comment:
       Move this up before the `withCoder` method

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -373,7 +419,86 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.readAll();
+
+      return input
+          .apply(Create.of(this))
+          .apply("Split", ParDo.of(new SplitFn()))
+          .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+          .apply("ReadAll", readAll.withCoder(this.coder()));
+    }
+
+    private class SplitFn extends DoFn<Read<T>, Read<T>> {
+
+      @ProcessElement
+      public void process(
+          @Element CassandraIO.Read<T> read, OutputReceiver<Read<T>> outputReceiver) {
+        getRingRanges(read)
+            .forEach(
+                rr ->
+                    outputReceiver.output(
+                        CassandraIO.<T>read()
+                            .withRingRanges(new HashSet<>(rr))
+                            .withCoder(coder())
+                            .withConsistencyLevel(consistencyLevel())
+                            .withEntity(entity())
+                            .withHosts(hosts())
+                            .withKeyspace(keyspace())
+                            .withLocalDc(localDc())
+                            .withPort(port())
+                            .withPassword(password())
+                            .withQuery(query())
+                            .withTable(table())
+                            .withUsername(username())
+                            .withMapperFactoryFn(mapperFactoryFn())));
+      }
+
+      Stream<Set<RingRange>> getRingRanges(Read<T> read) {
+        if (read.ringRanges() == null || read.ringRanges().get() == null) {
+          try (Cluster cluster =
+              getCluster(
+                  read.hosts(),
+                  read.port(),
+                  read.username(),
+                  read.password(),
+                  read.localDc(),
+                  read.consistencyLevel())) {
+            if (isMurmur3Partitioner(cluster)) {
+              LOG.info("Murmur3Partitioner detected, splitting");
+              Integer splitCount;
+              if (read.minNumberOfSplits() != null && read.minNumberOfSplits().get() != null) {
+                splitCount = read.minNumberOfSplits().get();
+              } else {
+                splitCount = cluster.getMetadata().getAllHosts().size();
+              }
+              List<BigInteger> tokens =
+                  cluster.getMetadata().getTokenRanges().stream()
+                      .map(tokenRange -> new BigInteger(tokenRange.getEnd().getValue().toString()))
+                      .collect(Collectors.toList());
+              SplitGenerator splitGenerator =
+                  new SplitGenerator(cluster.getMetadata().getPartitioner());
+
+              return splitGenerator.generateSplits(splitCount, tokens).stream()
+                  .map(l -> new HashSet<>(l));
+
+            } else {
+              LOG.warn(
+                  "Only Murmur3Partitioner is supported for splitting, using an unique source for "
+                      + "the read");
+              String partitioner = cluster.getMetadata().getPartitioner();
+              RingRange totalRingRange =
+                  RingRange.of(
+                      SplitGenerator.getRangeMin(partitioner),
+                      SplitGenerator.getRangeMax(partitioner));
+              return Collections.<Set<RingRange>>singleton(

Review comment:
       If we change the type I suppose we can make this return simpler.

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1170,4 +898,44 @@ private void waitForFuturesToFinish() throws ExecutionException, InterruptedExce
       }
     }
   }
+
+  /**
+   * A {@link PTransform} to read data from Apache Cassandra. See {@link CassandraIO} for more
+   * information on usage and configuration.
+   */
+  @AutoValue
+  public abstract static class ReadAll<T> extends PTransform<PCollection<Read<T>>, PCollection<T>> {
+
+    @Nullable
+    abstract Coder<T> coder();
+
+    abstract ReadAll.Builder<T> builder();
+
+    /** Specify the {@link Coder} used to serialize the entity in the {@link PCollection}. */
+    public ReadAll<T> withCoder(Coder<T> coder) {

Review comment:
       Right not so beautiful but well we have to live with this :+1: 

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Token;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.cassandra.CassandraIO.Read;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ReadFn<T> extends DoFn<Read<T>, T> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFn.class);
+
+  private transient Cluster cluster;
+
+  private transient Session session;
+
+  private transient Read<T> lastRead;
+
+  @ProcessElement
+  public void processElement(@Element Read<T> read, OutputReceiver<T> receiver) {
+    Session session = getSession(read);
+    Mapper<T> mapper = read.mapperFactoryFn().apply(session);
+    String partitionKey =
+        cluster.getMetadata().getKeyspace(read.keyspace().get()).getTable(read.table().get())
+            .getPartitionKey().stream()
+            .map(ColumnMetadata::getName)
+            .collect(Collectors.joining(","));
+
+    String query = generateRangeQuery(read, partitionKey, read.ringRanges() != null);
+    PreparedStatement preparedStatement = session.prepare(query);
+    Set<RingRange> ringRanges =
+        read.ringRanges() == null ? Collections.<RingRange>emptySet() : read.ringRanges().get();
+
+    for (RingRange rr : ringRanges) {
+      Token startToken = cluster.getMetadata().newToken(rr.getStart().toString());
+      Token endToken = cluster.getMetadata().newToken(rr.getEnd().toString());
+      ResultSet rs =
+          session.execute(preparedStatement.bind().setToken(0, startToken).setToken(1, endToken));
+      Iterator<T> iter = mapper.map(rs);
+      while (iter.hasNext()) {
+        T n = iter.next();
+        receiver.output(n);
+      }
+    }
+
+    if (read.ringRanges() == null) {
+      ResultSet rs = session.execute(preparedStatement.bind());
+      Iterator<T> iter = mapper.map(rs);
+      while (iter.hasNext()) {
+        receiver.output(iter.next());
+      }
+    }
+  }
+
+  @Teardown
+  public void teardown() {
+    if (session != null) {
+      this.session.close();
+    }
+    if (cluster != null) {
+      this.cluster.close();
+    }
+  }
+
+  private Session getSession(Read<T> read) {
+    if (cluster == null || !reuseCluster(this.lastRead, read)) {
+      this.cluster =
+          CassandraIO.getCluster(
+              read.hosts(),
+              read.port(),
+              read.username(),
+              read.password(),
+              read.localDc(),
+              read.consistencyLevel());
+    }
+    if (session == null || !reuseSession(lastRead, read)) {
+      this.session = this.cluster.connect(read.keyspace().get());
+    }
+    this.lastRead = read;
+    return this.session;
+  }
+
+  private static <T> boolean reuseCluster(Read<T> readA, Read<T> readB) {
+    return readA != null

Review comment:
       This is still a bit messy, don't you think we can get something similar by using something like connection pooling and delegate this complexity to the driver (where it should reside)?
   https://docs.datastax.com/en/developer/java-driver/2.1/manual/pooling/
   
   Notice that I am not familiar with this but after a quicklook seems to be worth the look.
   

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -373,7 +419,86 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.readAll();
+
+      return input
+          .apply(Create.of(this))
+          .apply("Split", ParDo.of(new SplitFn()))
+          .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+          .apply("ReadAll", readAll.withCoder(this.coder()));
+    }
+
+    private class SplitFn extends DoFn<Read<T>, Read<T>> {
+
+      @ProcessElement
+      public void process(
+          @Element CassandraIO.Read<T> read, OutputReceiver<Read<T>> outputReceiver) {
+        getRingRanges(read)

Review comment:
       I prefer to assign the result of `getRingRanges` to a Set and assign it to a variable to help ease potential debugging in the future.

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/RingRange.java
##########
@@ -55,4 +58,9 @@ public boolean isWrapping() {
   public String toString() {
     return String.format("(%s,%s]", start.toString(), end.toString());
   }
+
+  public static RingRange fromEncodedKey(Metadata metadata, ByteBuffer... bb) {

Review comment:
       Any reason not to do this? It looks like a convenience method that if you depend in other place (outside of Beam) you can easily move there.

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -373,7 +419,86 @@ private CassandraIO() {}
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new CassandraSource<>(this, null)));
+      ReadAll<T> readAll = CassandraIO.readAll();
+
+      return input
+          .apply(Create.of(this))
+          .apply("Split", ParDo.of(new SplitFn()))
+          .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+          .apply("ReadAll", readAll.withCoder(this.coder()));
+    }
+
+    private class SplitFn extends DoFn<Read<T>, Read<T>> {

Review comment:
       `private static class SplitFn<T>`

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -370,384 +488,16 @@ private CassandraIO() {}
         return autoBuild();
       }
     }
-  }
-
-  @VisibleForTesting
-  static class CassandraSource<T> extends BoundedSource<T> {
-    final Read<T> spec;
-    final List<String> splitQueries;
-    // split source ached size - can't be calculated when already split
-    Long estimatedSize;
-    private static final String MURMUR3PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner";
-
-    CassandraSource(Read<T> spec, List<String> splitQueries) {
-      this(spec, splitQueries, null);
-    }
-
-    private CassandraSource(Read<T> spec, List<String> splitQueries, Long estimatedSize) {
-      this.estimatedSize = estimatedSize;
-      this.spec = spec;
-      this.splitQueries = splitQueries;
-    }
-
-    @Override
-    public Coder<T> getOutputCoder() {
-      return spec.coder();
-    }
-
-    @Override
-    public BoundedReader<T> createReader(PipelineOptions pipelineOptions) {
-      return new CassandraReader(this);
-    }
-
-    @Override
-    public List<BoundedSource<T>> split(
-        long desiredBundleSizeBytes, PipelineOptions pipelineOptions) {
-      try (Cluster cluster =
-          getCluster(
-              spec.hosts(),
-              spec.port(),
-              spec.username(),
-              spec.password(),
-              spec.localDc(),
-              spec.consistencyLevel())) {
-        if (isMurmur3Partitioner(cluster)) {
-          LOG.info("Murmur3Partitioner detected, splitting");
-          return splitWithTokenRanges(
-              spec, desiredBundleSizeBytes, getEstimatedSizeBytes(pipelineOptions), cluster);
-        } else {
-          LOG.warn(
-              "Only Murmur3Partitioner is supported for splitting, using a unique source for "
-                  + "the read");
-          return Collections.singletonList(
-              new CassandraIO.CassandraSource<>(spec, Collections.singletonList(buildQuery(spec))));
-        }
-      }
-    }
-
-    private static String buildQuery(Read spec) {
-      return (spec.query() == null)
-          ? String.format("SELECT * FROM %s.%s", spec.keyspace().get(), spec.table().get())
-          : spec.query().get().toString();
-    }
-
-    /**
-     * Compute the number of splits based on the estimated size and the desired bundle size, and
-     * create several sources.
-     */
-    private List<BoundedSource<T>> splitWithTokenRanges(
-        CassandraIO.Read<T> spec,
-        long desiredBundleSizeBytes,
-        long estimatedSizeBytes,
-        Cluster cluster) {
-      long numSplits =
-          getNumSplits(desiredBundleSizeBytes, estimatedSizeBytes, spec.minNumberOfSplits());
-      LOG.info("Number of desired splits is {}", numSplits);
-
-      SplitGenerator splitGenerator = new SplitGenerator(cluster.getMetadata().getPartitioner());
-      List<BigInteger> tokens =
-          cluster.getMetadata().getTokenRanges().stream()
-              .map(tokenRange -> new BigInteger(tokenRange.getEnd().getValue().toString()))
-              .collect(Collectors.toList());
-      List<List<RingRange>> splits = splitGenerator.generateSplits(numSplits, tokens);
-      LOG.info("{} splits were actually generated", splits.size());
-
-      final String partitionKey =
-          cluster.getMetadata().getKeyspace(spec.keyspace().get()).getTable(spec.table().get())
-              .getPartitionKey().stream()
-              .map(ColumnMetadata::getName)
-              .collect(Collectors.joining(","));
-
-      List<TokenRange> tokenRanges =
-          getTokenRanges(cluster, spec.keyspace().get(), spec.table().get());
-      final long estimatedSize = getEstimatedSizeBytesFromTokenRanges(tokenRanges) / splits.size();
-
-      List<BoundedSource<T>> sources = new ArrayList<>();
-      for (List<RingRange> split : splits) {
-        List<String> queries = new ArrayList<>();
-        for (RingRange range : split) {
-          if (range.isWrapping()) {

Review comment:
       What is the take on this? More concretely I really would like we could test corner cases of splitting like we had in the removed test methods like `testEstimatedSizeBytesFromTokenRanges` and the others.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 500304)
    Time Spent: 14h 40m  (was: 14.5h)

> Add readAll() method to CassandraIO
> -----------------------------------
>
>                 Key: BEAM-9008
>                 URL: https://issues.apache.org/jira/browse/BEAM-9008
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-cassandra
>    Affects Versions: 2.16.0
>            Reporter: vincent marquez
>            Priority: P3
>          Time Spent: 14h 40m
>  Remaining Estimate: 0h
>
> When querying a large cassandra database, it's often *much* more useful to programatically generate the queries needed to to be run rather than reading all partitions and attempting some filtering.  
> As an example:
> {code:java}
> public class Event { 
>    @PartitionKey(0) public UUID accountId;
>    @PartitionKey(1)public String yearMonthDay; 
>    @ClusteringKey public UUID eventId;  
>    //other data...
> }{code}
> If there is ten years worth of data, you may want to only query one year's worth.  Here each token range would represent one 'token' but all events for the day. 
> {code:java}
> Set<UUID> accounts = getRelevantAccounts();
> Set<String> dateRange = generateDateRange("2018-01-01", "2019-01-01");
> PCollection<TokenRange> tokens = generateTokens(accounts, dateRange); 
> {code}
>  
>  I propose an additional _readAll()_ PTransform that can take a PCollection of token ranges and can return a PCollection<T> of what the query would return. 
> *Question: How much code should be in common between both methods?* 
> Currently the read connector already groups all partitions into a List of Token Ranges, so it would be simple to refactor the current read() based method to a 'ParDo' based one and have them both share the same function.  Reasons against sharing code between read and readAll
>  * Not having the read based method return a BoundedSource connector would mean losing the ability to know the size of the data returned
>  * Currently the CassandraReader executes all the grouped TokenRange queries *asynchronously* which is (maybe?) fine when all that's happening is splitting up all the partition ranges but terrible for executing potentially millions of queries. 
>  Reasons _for_ sharing code would be simplified code base and that both of the above issues would most likely have a negligable performance impact. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)