You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/06/07 06:05:22 UTC

[1/2] beam git commit: [BEAM-245] Add CassandraIO

Repository: beam
Updated Branches:
  refs/heads/master 3cc4ff6d7 -> c189d5c0e


[BEAM-245] Add CassandraIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b0bb3dc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b0bb3dc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b0bb3dc

Branch: refs/heads/master
Commit: 0b0bb3dc8d18b7d78780dbd39705e16a8aae028e
Parents: 3cc4ff6
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Tue Mar 28 16:46:37 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed Jun 7 07:40:05 2017 +0200

----------------------------------------------------------------------
 sdks/java/io/cassandra/pom.xml                  | 113 ++++
 .../beam/sdk/io/cassandra/CassandraIO.java      | 510 +++++++++++++++++++
 .../beam/sdk/io/cassandra/CassandraService.java |  66 +++
 .../sdk/io/cassandra/CassandraServiceImpl.java  | 398 +++++++++++++++
 .../beam/sdk/io/cassandra/package-info.java     |  22 +
 .../beam/sdk/io/cassandra/CassandraIOIT.java    | 254 +++++++++
 .../beam/sdk/io/cassandra/CassandraIOTest.java  | 279 ++++++++++
 .../io/cassandra/CassandraServiceImplTest.java  | 138 +++++
 .../sdk/io/cassandra/CassandraTestDataSet.java  | 153 ++++++
 .../sdk/io/common/IOTestPipelineOptions.java    |  10 +
 sdks/java/io/pom.xml                            |   1 +
 11 files changed, 1944 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/pom.xml b/sdks/java/io/cassandra/pom.xml
new file mode 100644
index 0000000..8249f57
--- /dev/null
+++ b/sdks/java/io/cassandra/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-parent</artifactId>
+    <version>2.1.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-io-cassandra</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: Cassandra</name>
+  <description>IO to read and write with Apache Cassandra database</description>
+
+  <properties>
+    <cassandra.driver.version>3.2.0</cassandra.driver.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.datastax.cassandra</groupId>
+      <artifactId>cassandra-driver-mapping</artifactId>
+      <version>${cassandra.driver.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.datastax.cassandra</groupId>
+      <artifactId>cassandra-driver-core</artifactId>
+      <version>${cassandra.driver.version}</version>
+    </dependency>
+
+    <!-- compile dependencies -->
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-common</artifactId>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
new file mode 100644
index 0000000..b6f4ef6
--- /dev/null
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An IO to read from Apache Cassandra.
+ *
+ * <h3>Reading from Apache Cassandra</h3>
+ *
+ * <p>{@code CassandraIO} provides a source to read and returns a bounded collection of
+ * entities as {@code PCollection<Entity>}. An entity is built by Cassandra mapper
+ * ({@code com.datastax.driver.mapping.EntityMapper}) based on a
+ * POJO containing annotations (as described http://docs.datastax
+ * .com/en/developer/java-driver/2.1/manual/object_mapper/creating/").
+ *
+ * <p>The following example illustrates various options for configuring the IO:
+ *
+ * <pre>{@code
+ * pipeline.apply(CassandraIO.<Person>read()
+ *     .withHosts(Arrays.asList("host1", "host2"))
+ *     .withPort(9042)
+ *     .withKeyspace("beam")
+ *     .withTable("Person")
+ *     .withEntity(Person.class)
+ *     .withCoder(SerializableCoder.of(Person.class))
+ *     // above options are the minimum set, returns PCollection<Person>
+ *
+ * }</pre>
+ *
+ * <h3>Writing to Apache Cassandra</h3>
+ *
+ * <p>{@code CassandraIO} provides a sink to write a collection of entities to Apache Cassandra.
+ *
+ * <p>The following example illustrates various options for configuring the IO write:
+ *
+ * <pre>{@code
+ * pipeline
+ *    .apply(...) // provides a PCollection<Person> where Person is an entity
+ *    .apply(CassandraIO.<Person>write()
+ *        .withHosts(Arrays.asList("host1", "host2"))
+ *        .withPort(9042)
+ *        .withKeyspace("beam")
+ *        .withEntity(Person.class));
+ * }</pre>
+ */
+@Experimental
+public class CassandraIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CassandraIO.class);
+
+  private CassandraIO() {}
+
+  /**
+   * Provide a {@link Read} {@link PTransform} to read data from a Cassandra database.
+   */
+  public static <T> Read<T> read() {
+    return new AutoValue_CassandraIO_Read.Builder<T>().build();
+  }
+
+  /**
+   * Provide a {@link Write} {@link PTransform} to write data to a Cassandra database.
+   */
+  public static <T> Write<T> write() {
+    return new AutoValue_CassandraIO_Write.Builder<T>().build();
+  }
+
+  /**
+   * A {@link PTransform} to read data from Apache Cassandra. See {@link CassandraIO} for more
+   * information on usage and configuration.
+   */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+    @Nullable abstract List<String> hosts();
+    @Nullable abstract Integer port();
+    @Nullable abstract String keyspace();
+    @Nullable abstract String table();
+    @Nullable abstract Class<T> entity();
+    @Nullable abstract Coder<T> coder();
+    @Nullable abstract String username();
+    @Nullable abstract String password();
+    @Nullable abstract String localDc();
+    @Nullable abstract String consistencyLevel();
+    @Nullable abstract CassandraService<T> cassandraService();
+    abstract Builder<T> builder();
+
+    /**
+     * Specify the hosts of the Apache Cassandra instances.
+     */
+    public Read<T> withHosts(List<String> hosts) {
+      checkArgument(hosts != null, "CassandraIO.read().withHosts(hosts) called with null hosts");
+      checkArgument(!hosts.isEmpty(), "CassandraIO.read().withHosts(hosts) called with empty "
+          + "hosts list");
+      return builder().setHosts(hosts).build();
+    }
+
+    /**
+     * Specify the port number of the Apache Cassandra instances.
+     */
+    public Read<T> withPort(int port) {
+      checkArgument(port > 0, "CassandraIO.read().withPort(port) called with invalid port "
+          + "number (%d)", port);
+      return builder().setPort(port).build();
+    }
+
+    /**
+     * Specify the Cassandra keyspace where to read data.
+     */
+    public Read<T> withKeyspace(String keyspace) {
+      checkArgument(keyspace != null, "CassandraIO.read().withKeyspace(keyspace) called with "
+          + "null keyspace");
+      return builder().setKeyspace(keyspace).build();
+    }
+
+    /**
+     * Specify the Cassandra table where to read data.
+     */
+    public Read<T> withTable(String table) {
+      checkArgument(table != null, "CassandraIO.read().withTable(table) called with null table");
+      return builder().setTable(table).build();
+    }
+
+    /**
+     * Specify the entity class (annotated POJO). The {@link CassandraIO} will read the data and
+     * convert the data as entity instances. The {@link PCollection} resulting from the read will
+     * contains entity elements.
+     */
+    public Read<T> withEntity(Class<T> entity) {
+      checkArgument(entity != null, "CassandraIO.read().withEntity(entity) called with null "
+          + "entity");
+      return builder().setEntity(entity).build();
+    }
+
+    /**
+     * Specify the {@link Coder} used to serialize the entity in the {@link PCollection}.
+     */
+    public Read<T> withCoder(Coder<T> coder) {
+      checkArgument(coder != null, "CassandraIO.read().withCoder(coder) called with null coder");
+      return builder().setCoder(coder).build();
+    }
+
+    /**
+     * Specify the username for authentication.
+     */
+    public Read<T> withUsername(String username) {
+      checkArgument(username != null, "CassandraIO.read().withUsername(username) called with "
+          + "null username");
+      return builder().setUsername(username).build();
+    }
+
+    /**
+     * Specify the password for authentication.
+     */
+    public Read<T> withPassword(String password) {
+      checkArgument(password != null, "CassandraIO.read().withPassword(password) called with "
+          + "null password");
+      return builder().setPassword(password).build();
+    }
+
+    /**
+     * Specify the local DC used for the load balancing.
+     */
+    public Read<T> withLocalDc(String localDc) {
+      checkArgument(localDc != null, "CassandraIO.read().withLocalDc(localDc) called with null "
+          + "localDc");
+      return builder().setLocalDc(localDc).build();
+    }
+
+    public Read<T> withConsistencyLevel(String consistencyLevel) {
+      checkArgument(consistencyLevel != null, "CassandraIO.read().withConsistencyLevel"
+          + "(consistencyLevel) called with null consistencyLevel");
+      return builder().setConsistencyLevel(consistencyLevel).build();
+    }
+
+    /**
+     * Specify an instance of {@link CassandraService} used to connect and read from Cassandra
+     * database.
+     */
+    public Read<T> withCassandraService(CassandraService<T> cassandraService) {
+      checkArgument(cassandraService != null, "CassandraIO.read().withCassandraService(service)"
+          + " called with null service");
+      return builder().setCassandraService(cassandraService).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      return input.apply(org.apache.beam.sdk.io.Read.from(
+          new CassandraSource<T>(this, null)));
+    }
+
+    @Override
+    public void validate(PipelineOptions pipelineOptions) {
+      checkState(hosts() != null || cassandraService() != null,
+          "CassandraIO.read() requires a list of hosts to be set via withHosts(hosts) or a "
+              + "Cassandra service to be set via withCassandraService(service)");
+      checkState(port() != null || cassandraService() != null, "CassandraIO.read() requires a "
+          + "valid port number to be set via withPort(port) or a Cassandra service to be set via "
+          + "withCassandraService(service)");
+      checkState(keyspace() != null, "CassandraIO.read() requires a keyspace to be set via "
+          + "withKeyspace(keyspace)");
+      checkState(table() != null, "CassandraIO.read() requires a table to be set via "
+          + "withTable(table)");
+      checkState(entity() != null, "CassandraIO.read() requires an entity to be set via "
+          + "withEntity(entity)");
+      checkState(coder() != null, "CassandraIO.read() requires a coder to be set via "
+          + "withCoder(coder)");
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setHosts(List<String> hosts);
+      abstract Builder<T> setPort(Integer port);
+      abstract Builder<T> setKeyspace(String keyspace);
+      abstract Builder<T> setTable(String table);
+      abstract Builder<T> setEntity(Class<T> entity);
+      abstract Builder<T> setCoder(Coder<T> coder);
+      abstract Builder<T> setUsername(String username);
+      abstract Builder<T> setPassword(String password);
+      abstract Builder<T> setLocalDc(String localDc);
+      abstract Builder<T> setConsistencyLevel(String consistencyLevel);
+      abstract Builder<T> setCassandraService(CassandraService<T> cassandraService);
+      abstract Read<T> build();
+    }
+
+    /**
+     * Helper function to either get a fake/mock Cassandra service provided by
+     * {@link #withCassandraService(CassandraService)} or creates and returns an implementation
+     * of a concrete Cassandra service dealing with a Cassandra instance.
+     */
+    @VisibleForTesting
+    CassandraService<T> getCassandraService() {
+      if (cassandraService() != null) {
+        return cassandraService();
+      }
+      return new CassandraServiceImpl<>();
+    }
+
+  }
+
+  @VisibleForTesting
+  static class CassandraSource<T> extends BoundedSource<T> {
+
+    protected final Read<T> spec;
+    protected final String splitQuery;
+
+    CassandraSource(Read<T> spec,
+                    String splitQuery) {
+      this.spec = spec;
+      this.splitQuery = splitQuery;
+    }
+
+    @Override
+    public Coder<T> getDefaultOutputCoder() {
+      return spec.coder();
+    }
+
+    @Override
+    public void validate() {
+      spec.validate(null);
+    }
+
+    @Override
+    public BoundedReader<T> createReader(PipelineOptions pipelineOptions) {
+      return spec.getCassandraService().createReader(this);
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
+      return spec.getCassandraService().getEstimatedSizeBytes(spec);
+    }
+
+    @Override
+    public List<BoundedSource<T>> split(long desiredBundleSizeBytes,
+                                                   PipelineOptions pipelineOptions) {
+      return spec.getCassandraService()
+          .split(spec, desiredBundleSizeBytes);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      if (spec.hosts() != null) {
+        builder.add(DisplayData.item("hosts", spec.hosts().toString()));
+      }
+      if (spec.port() != null) {
+        builder.add(DisplayData.item("port", spec.port()));
+      }
+      builder.addIfNotNull(DisplayData.item("keyspace", spec.keyspace()));
+      builder.addIfNotNull(DisplayData.item("table", spec.table()));
+      builder.addIfNotNull(DisplayData.item("username", spec.username()));
+      builder.addIfNotNull(DisplayData.item("localDc", spec.localDc()));
+      builder.addIfNotNull(DisplayData.item("consistencyLevel", spec.consistencyLevel()));
+    }
+  }
+
+  /**
+   * A {@link PTransform} to write into Apache Cassandra. See {@link CassandraIO} for details on
+   * usage and configuration.
+   */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+
+    @Nullable abstract List<String> hosts();
+    @Nullable abstract Integer port();
+    @Nullable abstract String keyspace();
+    @Nullable abstract Class<T> entity();
+    @Nullable abstract String username();
+    @Nullable abstract String password();
+    @Nullable abstract String localDc();
+    @Nullable abstract String consistencyLevel();
+    @Nullable abstract CassandraService<T> cassandraService();
+    abstract Builder<T> builder();
+
+    /**
+     * Specify the Cassandra instance hosts where to write data.
+     */
+    public Write<T> withHosts(List<String> hosts) {
+      checkArgument(hosts != null, "CassandraIO.write().withHosts(hosts) called with null hosts");
+      checkArgument(!hosts.isEmpty(), "CassandraIO.write().withHosts(hosts) called with empty "
+          + "hosts list");
+      return builder().setHosts(hosts).build();
+    }
+
+    /**
+     * Specify the Cassandra instance port number where to write data.
+     */
+    public Write<T> withPort(int port) {
+      checkArgument(port > 0, "CassandraIO.write().withPort(port) called with invalid port "
+          + "number (%d)", port);
+      return builder().setPort(port).build();
+    }
+
+    /**
+     * Specify the Cassandra keyspace where to write data.
+     */
+    public Write<T> withKeyspace(String keyspace) {
+      checkArgument(keyspace != null, "CassandraIO.write().withKeyspace(keyspace) called with "
+          + "null keyspace");
+      return builder().setKeyspace(keyspace).build();
+    }
+
+    /**
+     * Specify the entity class in the input {@link PCollection}. The {@link CassandraIO} will
+     * map this entity to the Cassandra table thanks to the annotations.
+     */
+    public Write<T> withEntity(Class<T> entity) {
+      checkArgument(entity != null, "CassandraIO.write().withEntity(entity) called with null "
+          + "entity");
+      return builder().setEntity(entity).build();
+    }
+
+    /**
+     * Specify the username used for authentication.
+     */
+    public Write<T> withUsername(String username) {
+      checkArgument(username != null, "CassandraIO.write().withUsername(username) called with "
+          + "null username");
+      return builder().setUsername(username).build();
+    }
+
+    /**
+     * Specify the password used for authentication.
+     */
+    public Write<T> withPassword(String password) {
+      checkArgument(password != null, "CassandraIO.write().withPassword(password) called with "
+          + "null password");
+      return builder().setPassword(password).build();
+    }
+
+    /**
+     * Specify the local DC used by the load balancing policy.
+     */
+    public Write<T> withLocalDc(String localDc) {
+      checkArgument(localDc != null, "CassandraIO.write().withLocalDc(localDc) called with null"
+          + " localDc");
+      return builder().setLocalDc(localDc).build();
+    }
+
+    public Write<T> withConsistencyLevel(String consistencyLevel) {
+      checkArgument(consistencyLevel != null, "CassandraIO.write().withConsistencyLevel"
+          + "(consistencyLevel) called with null consistencyLevel");
+      return builder().setConsistencyLevel(consistencyLevel).build();
+    }
+
+    /**
+     * Specify the {@link CassandraService} used to connect and write into the Cassandra database.
+     */
+    public Write<T> withCassandraService(CassandraService<T> cassandraService) {
+      checkArgument(cassandraService != null, "CassandraIO.write().withCassandraService"
+          + "(service) called with null service");
+      return builder().setCassandraService(cassandraService).build();
+    }
+
+    @Override
+    public void validate(PipelineOptions pipelineOptions) {
+      checkState(hosts() != null || cassandraService() != null,
+          "CassandraIO.write() requires a list of hosts to be set via withHosts(hosts) or a "
+              + "Cassandra service to be set via withCassandraService(service)");
+      checkState(port() != null || cassandraService() != null, "CassandraIO.write() requires a "
+          + "valid port number to be set via withPort(port) or a Cassandra service to be set via "
+          + "withCassandraService(service)");
+      checkState(keyspace() != null, "CassandraIO.write() requires a keyspace to be set via "
+          + "withKeyspace(keyspace)");
+      checkState(entity() != null, "CassandraIO.write() requires an entity to be set via "
+          + "withEntity(entity)");
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      input.apply(ParDo.of(new WriteFn<T>(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setHosts(List<String> hosts);
+      abstract Builder<T> setPort(Integer port);
+      abstract Builder<T> setKeyspace(String keyspace);
+      abstract Builder<T> setEntity(Class<T> entity);
+      abstract Builder<T> setUsername(String username);
+      abstract Builder<T> setPassword(String password);
+      abstract Builder<T> setLocalDc(String localDc);
+      abstract Builder<T> setConsistencyLevel(String consistencyLevel);
+      abstract Builder<T> setCassandraService(CassandraService<T> cassandraService);
+      abstract Write<T> build();
+    }
+
+    /**
+     * Helper function to either get a fake/mock Cassandra service provided by
+     * {@link #withCassandraService(CassandraService)} or creates and returns an implementation
+     * of a concrete Cassandra service dealing with a Cassandra instance.
+     */
+    @VisibleForTesting
+    CassandraService<T> getCassandraService() {
+      if (cassandraService() != null) {
+        return cassandraService();
+      }
+      return new CassandraServiceImpl<>();
+    }
+
+  }
+
+  private static class WriteFn<T> extends DoFn<T, Void> {
+
+    private final Write<T> spec;
+    private CassandraService.Writer writer;
+
+    public WriteFn(Write<T> spec) {
+      this.spec = spec;
+    }
+
+    @Setup
+    public void setup() throws Exception {
+      writer = spec.getCassandraService().createWriter(spec);
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext processContext) {
+      T entity = processContext.element();
+      writer.write(entity);
+    }
+
+    @Teardown
+    public void teardown() throws Exception {
+      writer.close();
+      writer = null;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
new file mode 100644
index 0000000..5071762
--- /dev/null
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.beam.sdk.io.BoundedSource;
+
+/**
+ * An interface for real or fake implementations of Cassandra.
+ */
+public interface CassandraService<T> extends Serializable {
+
+  /**
+   * Returns a {@link org.apache.beam.sdk.io.BoundedSource.BoundedReader} that will read from
+   * Cassandra using the spec from
+   * {@link org.apache.beam.sdk.io.cassandra.CassandraIO.CassandraSource}.
+   */
+  BoundedSource.BoundedReader<T> createReader(CassandraIO.CassandraSource<T> source);
+
+  /**
+   * Returns an estimation of the size that could be read.
+   */
+  long getEstimatedSizeBytes(CassandraIO.Read<T> spec);
+
+  /**
+   * Split a table read into several sources.
+   */
+  List<BoundedSource<T>> split(CassandraIO.Read<T> spec,
+                                          long desiredBundleSizeBytes);
+
+  /**
+   * Create a {@link Writer} that writes entities into the Cassandra instance.
+   */
+  Writer createWriter(CassandraIO.Write<T> spec) throws Exception;
+
+  /**
+   * Writer for an entity.
+   */
+  interface Writer<T> extends AutoCloseable {
+
+    /**
+     * This method should be synchronous. It means you have to be sure that the entity is fully
+     * stored (and committed) into the Cassandra instance when you exit from this method.
+     */
+    void write(T entity);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
new file mode 100644
index 0000000..63c8ef4
--- /dev/null
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the {@link CassandraService} that actually use a Cassandra instance.
+ */
+public class CassandraServiceImpl<T> implements CassandraService<T> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CassandraServiceImpl.class);
+
+  private static final long MIN_TOKEN = Long.MIN_VALUE;
+  private static final long MAX_TOKEN = Long.MAX_VALUE;
+  private static final BigInteger TOTAL_TOKEN_COUNT =
+      BigInteger.valueOf(MAX_TOKEN).subtract(BigInteger.valueOf(MIN_TOKEN));
+
+  private class CassandraReaderImpl<T> extends BoundedSource.BoundedReader<T> {
+
+    private final CassandraIO.CassandraSource<T> source;
+
+    private Cluster cluster;
+    private Session session;
+    private ResultSet resultSet;
+    private Iterator<T> iterator;
+    private T current;
+
+    public CassandraReaderImpl(CassandraIO.CassandraSource<T> source) {
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      LOG.debug("Starting Cassandra reader");
+      cluster = getCluster(source.spec.hosts(), source.spec.port(), source.spec.username(),
+          source.spec.password(), source.spec.localDc(), source.spec.consistencyLevel());
+      session = cluster.connect();
+      LOG.debug("Query: " + source.splitQuery);
+      resultSet = session.execute(source.splitQuery);
+
+      final MappingManager mappingManager = new MappingManager(session);
+      Mapper mapper = mappingManager.mapper(source.spec.entity());
+      iterator = mapper.map(resultSet).iterator();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (iterator.hasNext()) {
+        current = iterator.next();
+        return true;
+      }
+      current = null;
+      return false;
+    }
+
+    @Override
+    public void close() {
+      LOG.debug("Closing Cassandra reader");
+      if (session != null) {
+        session.close();
+      }
+      if (cluster != null) {
+        cluster.close();
+      }
+    }
+
+    @Override
+    public T getCurrent() throws NoSuchElementException {
+      if (current == null) {
+        throw new NoSuchElementException();
+      }
+      return current;
+    }
+
+    @Override
+    public CassandraIO.CassandraSource<T> getCurrentSource() {
+      return source;
+    }
+
+  }
+
+  @Override
+  public CassandraReaderImpl<T> createReader(CassandraIO.CassandraSource<T> source) {
+    return new CassandraReaderImpl<>(source);
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(CassandraIO.Read<T> spec) {
+    try (Cluster cluster = getCluster(spec.hosts(), spec.port(), spec.username(), spec.password(),
+        spec.localDc(), spec.consistencyLevel())) {
+      if (isMurmur3Partitioner(cluster)) {
+        try {
+          List<TokenRange> tokenRanges = getTokenRanges(cluster,
+              spec.keyspace(),
+              spec.table());
+          return getEstimatedSizeBytes(tokenRanges);
+        } catch (Exception e) {
+          LOG.warn("Can't estimate the size", e);
+          return 0L;
+        }
+      } else {
+        LOG.warn("Only Murmur3 partitioner is supported, can't estimate the size");
+        return 0L;
+      }
+    }
+  }
+
+  /**
+   * Actually estimate the size of the data to read on the cluster, based on the given token
+   * ranges to address.
+   */
+  @VisibleForTesting
+  protected static long getEstimatedSizeBytes(List<TokenRange> tokenRanges) {
+    long size = 0L;
+    for (TokenRange tokenRange : tokenRanges) {
+      size = size + tokenRange.meanPartitionSize * tokenRange.partitionCount;
+    }
+    return Math.round(size / getRingFraction(tokenRanges));
+  }
+
+  @Override
+  public List<BoundedSource<T>> split(CassandraIO.Read<T> spec,
+      long desiredBundleSizeBytes) {
+    try (Cluster cluster = getCluster(spec.hosts(), spec.port(), spec.username(), spec.password(),
+        spec.localDc(), spec.consistencyLevel())) {
+      if (isMurmur3Partitioner(cluster)) {
+        LOG.info("Murmur3Partitioner detected, splitting");
+        return split(spec, desiredBundleSizeBytes, getEstimatedSizeBytes(spec));
+      } else {
+        LOG.warn("Only Murmur3Partitioner is supported for splitting, using an unique source for "
+            + "the read");
+        String splitQuery = QueryBuilder.select().from(spec.keyspace(), spec.table()).toString();
+        List<BoundedSource<T>> sources = new ArrayList<>();
+        sources.add(new CassandraIO.CassandraSource<T>(spec, splitQuery));
+        return sources;
+      }
+    }
+  }
+
+  /**
+   * Compute the number of splits based on the estimated size and the desired bundle size, and
+   * create several sources.
+   */
+  @VisibleForTesting
+  protected List<BoundedSource<T>> split(CassandraIO.Read<T> spec,
+                                                long desiredBundleSizeBytes,
+                                                long estimatedSizeBytes) {
+    long numSplits = 1;
+    List<BoundedSource<T>> sourceList = new ArrayList<>();
+    if (desiredBundleSizeBytes > 0) {
+      numSplits = estimatedSizeBytes / desiredBundleSizeBytes;
+    }
+    if (numSplits <= 0) {
+      LOG.warn("Number of splits is less than 0 ({}), fallback to 1", numSplits);
+      numSplits = 1;
+    }
+
+    LOG.info("Number of splits is {}", numSplits);
+
+    double startRange = MIN_TOKEN;
+    double endRange = MAX_TOKEN;
+    double startToken, endToken;
+
+    endToken = startRange;
+    double incrementValue = endRange - startRange / numSplits;
+    String splitQuery;
+    if (numSplits == 1) {
+      // we have an unique split
+      splitQuery = QueryBuilder.select().from(spec.keyspace(), spec.table()).toString();
+      sourceList.add(new CassandraIO.CassandraSource<T>(spec, splitQuery));
+    } else {
+      // we have more than one split
+      for (int i = 0; i < numSplits; i++) {
+        startToken = endToken;
+        endToken = (i == numSplits) ? endRange : (startToken + incrementValue);
+        Select.Where builder = QueryBuilder.select().from(spec.keyspace(), spec.table()).where();
+        if (i > 0) {
+          builder = builder.and(QueryBuilder.gte("token($pk)", startToken));
+        }
+        if (i < (numSplits - 1)) {
+          builder = builder.and(QueryBuilder.lt("token($pk)", endToken));
+        }
+        sourceList.add(new CassandraIO.CassandraSource(spec, builder.toString()));
+      }
+    }
+    return sourceList;
+  }
+
+  /**
+   * Get a Cassandra cluster using hosts and port.
+   */
+  private Cluster getCluster(List<String> hosts, int port, String username, String password,
+                             String localDc, String consistencyLevel) {
+    Cluster.Builder builder = Cluster.builder()
+        .addContactPoints(hosts.toArray(new String[0]))
+        .withPort(port);
+
+    if (username != null) {
+      builder.withAuthProvider(new PlainTextAuthProvider(username, password));
+    }
+
+    if (localDc != null) {
+      builder.withLoadBalancingPolicy(
+          new TokenAwarePolicy(new DCAwareRoundRobinPolicy.Builder().withLocalDc(localDc).build()));
+    } else {
+      builder.withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy()));
+    }
+
+    if (consistencyLevel != null) {
+      builder.withQueryOptions(
+          new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel)));
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Gets the list of token ranges that a table occupies on a give Cassandra node.
+   *
+   * <p>NB: This method is compatible with Cassandra 2.1.5 and greater.
+   */
+  private static List<TokenRange> getTokenRanges(Cluster cluster, String keyspace, String table) {
+    try (Session session = cluster.newSession()) {
+      ResultSet resultSet =
+          session.execute(
+              "SELECT range_start, range_end, partitions_count, mean_partition_size FROM "
+                  + "system.size_estimates WHERE keyspace_name = ? AND table_name = ?",
+              keyspace,
+              table);
+
+      ArrayList<TokenRange> tokenRanges = new ArrayList<>();
+      for (Row row : resultSet) {
+        TokenRange tokenRange =
+            new TokenRange(
+                row.getLong("partitions_count"),
+                row.getLong("mean_partition_size"),
+                row.getLong("range_start"),
+                row.getLong("range_end"));
+        tokenRanges.add(tokenRange);
+      }
+      // The table may not contain the estimates yet
+      // or have partitions_count and mean_partition_size fields = 0
+      // if the data was just inserted and the amount of data in the table was small.
+      // This is very common situation during tests,
+      // when we insert a few rows and immediately query them.
+      // However, for tiny data sets the lack of size estimates is not a problem at all,
+      // because we don't want to split tiny data anyways.
+      // Therefore, we're not issuing a warning if the result set was empty
+      // or mean_partition_size and partitions_count = 0.
+      return tokenRanges;
+    }
+  }
+
+  /**
+   * Compute the percentage of token addressed compared with the whole tokens in the cluster.
+   */
+  @VisibleForTesting
+  protected static double getRingFraction(List<TokenRange> tokenRanges) {
+    double ringFraction = 0;
+    for (TokenRange tokenRange : tokenRanges) {
+      ringFraction = ringFraction + (distance(tokenRange.rangeStart, tokenRange.rangeEnd)
+          .doubleValue() / TOTAL_TOKEN_COUNT.doubleValue());
+    }
+    return ringFraction;
+  }
+
+  /**
+   * Measure distance between two tokens.
+   */
+  @VisibleForTesting
+  protected static BigInteger distance(long left, long right) {
+    if (right > left) {
+      return BigInteger.valueOf(right).subtract(BigInteger.valueOf(left));
+    } else {
+      return BigInteger.valueOf(right).subtract(BigInteger.valueOf(left)).add(TOTAL_TOKEN_COUNT);
+    }
+  }
+
+  /**
+   * Check if the current partitioner is the Murmur3 (default in Cassandra version newer than 2).
+   */
+  @VisibleForTesting
+  protected static boolean isMurmur3Partitioner(Cluster cluster) {
+    return cluster.getMetadata().getPartitioner()
+        .equals("org.apache.cassandra.dht.Murmur3Partitioner");
+  }
+
+  /**
+   * Represent a token range in Cassandra instance, wrapping the partition count, size and token
+   * range.
+   */
+  @VisibleForTesting
+  protected static class TokenRange {
+    private final long partitionCount;
+    private final long meanPartitionSize;
+    private final long rangeStart;
+    private final long rangeEnd;
+
+    public TokenRange(
+        long partitionCount, long meanPartitionSize, long rangeStart, long
+        rangeEnd) {
+      this.partitionCount = partitionCount;
+      this.meanPartitionSize = meanPartitionSize;
+      this.rangeStart = rangeStart;
+      this.rangeEnd = rangeEnd;
+    }
+  }
+
+  /**
+   * Writer storing an entity into Apache Cassandra database.
+   */
+  protected class WriterImpl<T> implements Writer<T> {
+
+    private final CassandraIO.Write<T> spec;
+
+    private final Cluster cluster;
+    private final Session session;
+    private final MappingManager mappingManager;
+
+    public WriterImpl(CassandraIO.Write<T> spec) {
+      this.spec = spec;
+      this.cluster = getCluster(spec.hosts(), spec.port(), spec.username(), spec.password(),
+          spec.localDc(), spec.consistencyLevel());
+      this.session = cluster.connect(spec.keyspace());
+      this.mappingManager = new MappingManager(session);
+    }
+
+    /**
+     * Write the entity to the Cassandra instance, using {@link Mapper} obtained with the
+     * {@link MappingManager}. This method use {@link Mapper#save(Object)} method, which is
+     * synchronous. It means the entity is guaranteed to be reliably committed to Cassandra.
+     */
+    @Override
+    public void write(T entity) {
+      Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(entity.getClass());
+      mapper.save(entity);
+    }
+
+    @Override
+    public void close() {
+      if (session != null) {
+        session.close();
+      }
+      if (cluster != null) {
+        cluster.close();
+      }
+    }
+
+  }
+
+  @Override
+  public Writer createWriter(CassandraIO.Write<T> spec) {
+    return new WriterImpl(spec);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/package-info.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/package-info.java
new file mode 100644
index 0000000..6659b62
--- /dev/null
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from Apache Cassandra database.
+ */
+package org.apache.beam.sdk.io.cassandra;

http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
new file mode 100644
index 0000000..e67d305
--- /dev/null
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import static org.junit.Assert.assertEquals;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.driver.mapping.annotations.PartitionKey;
+import com.datastax.driver.mapping.annotations.Table;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SerializableMatcher;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * A test of {@link CassandraIO} on a concrete and independent Cassandra instance.
+ *
+ * <p>This test requires a running Cassandra instance, and the test dataset must exists.
+ *
+ * <p>You can run this test directly using Maven with:
+ *
+ * <pre>{@code
+ * mvn -e -Pio-it verify -pl sdks/java/io/cassandra -DintegrationTestPipelineOptions='[
+ * "--cassandraHost=1.2.3.4",
+ * "--cassandraPort=9042"]'
+ * }</pre>
+ */
+@RunWith(JUnit4.class)
+public class CassandraIOIT implements Serializable {
+
+  private static IOTestPipelineOptions options;
+
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+    options = TestPipeline.testingPipelineOptions()
+        .as(IOTestPipelineOptions.class);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    // cleanup the write table
+    CassandraTestDataSet.cleanUpDataTable(options);
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    PCollection<Scientist> output = pipeline.apply(CassandraIO.<Scientist>read()
+        .withHosts(Collections.singletonList(options.getCassandraHost()))
+        .withPort(options.getCassandraPort())
+        .withKeyspace(CassandraTestDataSet.KEYSPACE)
+        .withTable(CassandraTestDataSet.TABLE_READ_NAME)
+        .withEntity(Scientist.class)
+        .withCoder(SerializableCoder.of(Scientist.class)));
+
+    PAssert.thatSingleton(output.apply("Count scientist", Count.<Scientist>globally()))
+        .isEqualTo(1000L);
+
+    PCollection<KV<String, Integer>> mapped =
+        output.apply(
+            MapElements.via(
+                new SimpleFunction<Scientist, KV<String, Integer>>() {
+                  public KV<String, Integer> apply(Scientist scientist) {
+                    KV<String, Integer> kv = KV.of(scientist.name, scientist.id);
+                    return kv;
+                  }
+                }
+            )
+        );
+    PAssert.that(mapped.apply("Count occurrences per scientist", Count.<String, Integer>perKey()))
+        .satisfies(
+            new SerializableFunction<Iterable<KV<String, Long>>, Void>() {
+              @Override
+              public Void apply(Iterable<KV<String, Long>> input) {
+                for (KV<String, Long> element : input) {
+                  assertEquals(element.getKey(), 1000 / 10, element.getValue().longValue());
+                }
+                return null;
+              }
+            }
+        );
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testWrite() throws Exception {
+    IOTestPipelineOptions options =
+        TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
+
+    options.setOnSuccessMatcher(
+        new CassandraMatcher(
+            CassandraTestDataSet.getCluster(options),
+            CassandraTestDataSet.TABLE_WRITE_NAME));
+
+    TestPipeline.convertToArgs(options);
+
+    ArrayList<ScientistForWrite> data = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      ScientistForWrite scientist = new ScientistForWrite();
+      scientist.id = i;
+      scientist.name = "Name " + i;
+      data.add(scientist);
+    }
+
+    pipeline
+        .apply(Create.of(data))
+        .apply(CassandraIO.<ScientistForWrite>write()
+            .withHosts(Collections.singletonList(options.getCassandraHost()))
+            .withPort(options.getCassandraPort())
+            .withKeyspace(CassandraTestDataSet.KEYSPACE)
+            .withEntity(ScientistForWrite.class));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * Simple matcher.
+   */
+  public class CassandraMatcher extends TypeSafeMatcher<PipelineResult>
+      implements SerializableMatcher<PipelineResult> {
+
+    private String tableName;
+    private Cluster cluster;
+
+    public CassandraMatcher(Cluster cluster, String tableName) {
+      this.cluster = cluster;
+      this.tableName = tableName;
+    }
+
+    @Override
+    protected boolean matchesSafely(PipelineResult pipelineResult) {
+      pipelineResult.waitUntilFinish();
+      Session session = cluster.connect();
+      ResultSet result = session.execute("select id,name from " + CassandraTestDataSet.KEYSPACE
+          + "." + tableName);
+      List<Row> rows = result.all();
+      if (rows.size() != 1000) {
+        return false;
+      }
+      for (Row row : rows) {
+        if (!row.getString("name").matches("Name.*")) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText("Expected Cassandra record pattern is (Name.*)");
+    }
+  }
+
+  /**
+   * Simple Cassandra entity representing a scientist. Used for read test.
+   */
+  @Table(name = CassandraTestDataSet.TABLE_READ_NAME, keyspace = CassandraTestDataSet.KEYSPACE)
+  public static class Scientist implements Serializable {
+
+    @PartitionKey
+    @Column(name = "id")
+    private final int id;
+
+    @Column(name = "name")
+    private final String name;
+
+    public Scientist() {
+      this(0, "");
+    }
+
+    public Scientist(int id) {
+      this(0, "");
+    }
+
+    public Scientist(int id, String name) {
+      this.id = id;
+      this.name = name;
+    }
+
+    public int getId() {
+      return id;
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
+
+  /**
+   * Simple Cassandra entity representing a scientist, used for write test.
+   */
+  @Table(name = CassandraTestDataSet.TABLE_WRITE_NAME, keyspace = CassandraTestDataSet.KEYSPACE)
+  public class ScientistForWrite implements Serializable {
+
+    @PartitionKey
+    @Column(name = "id")
+    public Integer id;
+
+    @Column(name = "name")
+    public String name;
+
+    public String toString() {
+      return id + ":" + name;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
new file mode 100644
index 0000000..cfd78d2
--- /dev/null
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.driver.mapping.annotations.Table;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Tests of {@link CassandraIO}. */
+public class CassandraIOTest implements Serializable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CassandraIOTest.class);
+
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void testEstimatedSizeBytes() throws Exception {
+    final FakeCassandraService service = new FakeCassandraService();
+    service.load();
+
+    PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
+    CassandraIO.Read spec = CassandraIO.<Scientist>read().withCassandraService(service);
+    CassandraIO.CassandraSource source = new CassandraIO.CassandraSource(
+        spec,
+        null);
+    long estimatedSizeBytes = source.getEstimatedSizeBytes(pipelineOptions);
+    // the size is the sum of the bytes size of the String representation of a scientist in the map
+    assertEquals(113890, estimatedSizeBytes);
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    FakeCassandraService service = new FakeCassandraService();
+    service.load();
+
+    PCollection<Scientist> output = pipeline.apply(CassandraIO
+        .<Scientist>read()
+        .withCassandraService(service)
+        .withKeyspace("beam")
+        .withTable("scientist")
+        .withCoder(SerializableCoder.of(Scientist.class))
+        .withEntity(Scientist.class)
+    );
+
+    PAssert.thatSingleton(output.apply("Count", Count.<Scientist>globally()))
+        .isEqualTo(10000L);
+
+    PCollection<KV<String, Integer>> mapped =
+        output.apply(
+            MapElements.via(
+                new SimpleFunction<Scientist, KV<String, Integer>>() {
+                  public KV<String, Integer> apply(Scientist scientist) {
+                    return KV.of(scientist.name, scientist.id);
+                  }
+                }));
+    PAssert.that(mapped.apply("Count occurrences per scientist", Count.<String, Integer>perKey()))
+        .satisfies(
+            new SerializableFunction<Iterable<KV<String, Long>>, Void>() {
+              @Override
+              public Void apply(Iterable<KV<String, Long>> input) {
+                for (KV<String, Long> element : input) {
+                  assertEquals(element.getKey(), 1000, element.getValue().longValue());
+                }
+                return null;
+              }
+            });
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testWrite() throws  Exception {
+    FakeCassandraService service = new FakeCassandraService();
+
+    ArrayList<Scientist> data = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      Scientist scientist = new Scientist();
+      scientist.id = i;
+      scientist.name = "Name " + i;
+      data.add(scientist);
+    }
+
+    pipeline
+        .apply(Create.of(data))
+        .apply(CassandraIO.<Scientist>write().withCassandraService(service)
+            .withKeyspace("beam")
+            .withEntity(Scientist.class));
+    pipeline.run();
+
+    assertEquals(service.getTable().size(), 1000);
+    for (Scientist scientist : service.getTable().values()) {
+      assertTrue(scientist.name.matches("Name (\\d*)"));
+    }
+  }
+
+  /**
+   * A {@link CassandraService} implementation that stores the entity in memory.
+   */
+  private static class FakeCassandraService implements CassandraService<Scientist> {
+
+    private static final Map<Integer, Scientist> table = new ConcurrentHashMap<>();
+
+    public void load() {
+      table.clear();
+      String[] scientists = {
+          "Lovelace",
+          "Franklin",
+          "Meitner",
+          "Hopper",
+          "Curie",
+          "Faraday",
+          "Newton",
+          "Bohr",
+          "Galilei",
+          "Maxwell"
+      };
+      for (int i = 0; i < 10000; i++) {
+        int index = i % scientists.length;
+        Scientist scientist = new Scientist();
+        scientist.id = i;
+        scientist.name = scientists[index];
+        table.put(scientist.id, scientist);
+      }
+    }
+
+    public Map<Integer, Scientist> getTable() {
+      return table;
+    }
+
+    @Override
+    public FakeCassandraReader createReader(CassandraIO.CassandraSource source) {
+      return new FakeCassandraReader(source);
+    }
+
+    static class FakeCassandraReader extends BoundedSource.BoundedReader {
+
+      private final CassandraIO.CassandraSource source;
+
+      private Iterator<Scientist> iterator;
+      private Scientist current;
+
+      public FakeCassandraReader(CassandraIO.CassandraSource source) {
+        this.source = source;
+      }
+
+      @Override
+      public boolean start() throws IOException {
+        iterator = table.values().iterator();
+        return advance();
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        if (iterator.hasNext()) {
+          current = iterator.next();
+          return true;
+        }
+        current = null;
+        return false;
+      }
+
+      @Override
+      public void close() {
+        iterator = null;
+        current = null;
+      }
+
+      @Override
+      public Scientist getCurrent() throws NoSuchElementException {
+        if (current == null) {
+          throw new NoSuchElementException();
+        }
+        return current;
+      }
+
+      @Override
+      public CassandraIO.CassandraSource getCurrentSource() {
+        return this.source;
+      }
+
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(CassandraIO.Read spec) {
+      long size = 0L;
+      for (Scientist scientist : table.values()) {
+        size = size + scientist.toString().getBytes().length;
+      }
+      return size;
+    }
+
+    @Override
+    public List<BoundedSource<Scientist>> split(CassandraIO.Read spec,
+                                                           long desiredBundleSizeBytes) {
+      List<BoundedSource<Scientist>> sources = new ArrayList<>();
+      sources.add(new CassandraIO.CassandraSource<Scientist>(spec, null));
+      return sources;
+    }
+
+    static class FakeCassandraWriter implements Writer<Scientist> {
+
+      @Override
+      public void write(Scientist scientist) {
+        table.put(scientist.id, scientist);
+      }
+
+      @Override
+      public void close() {
+        // nothing to do
+      }
+
+    }
+
+    @Override
+    public FakeCassandraWriter createWriter(CassandraIO.Write<Scientist> spec) {
+      return new FakeCassandraWriter();
+    }
+
+  }
+
+  /** Simple Cassandra entity used in test. */
+  @Table(name = "scientist", keyspace = "beam")
+  public static class Scientist implements Serializable {
+
+    @Column(name = "person_name")
+    public String name;
+
+    @Column(name = "person_id")
+    public int id;
+
+    public String toString() {
+      return id + ":" + name;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImplTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImplTest.java
new file mode 100644
index 0000000..6a68e90
--- /dev/null
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImplTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Metadata;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests on {@link CassandraServiceImplTest}.
+ */
+public class CassandraServiceImplTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CassandraServiceImplTest.class);
+
+  private static final String MURMUR3_PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner";
+
+  private Cluster createClusterMock() {
+    Metadata metadata = Mockito.mock(Metadata.class);
+    Mockito.when(metadata.getPartitioner()).thenReturn(MURMUR3_PARTITIONER);
+    Cluster cluster = Mockito.mock(Cluster.class);
+    Mockito.when(cluster.getMetadata()).thenReturn(metadata);
+    return cluster;
+  }
+
+  @Test
+  public void testValidPartitioner() throws Exception {
+    assertTrue(CassandraServiceImpl.isMurmur3Partitioner(createClusterMock()));
+  }
+
+  @Test
+  public void testDistance() throws Exception {
+    BigInteger distance = CassandraServiceImpl.distance(10L, 100L);
+    assertEquals(BigInteger.valueOf(90), distance);
+
+    distance = CassandraServiceImpl.distance(100L, 10L);
+    assertEquals(new BigInteger("18446744073709551525"), distance);
+  }
+
+  @Test
+  public void testRingFraction() throws Exception {
+    // simulate a first range taking "half" of the available tokens
+    List<CassandraServiceImpl.TokenRange> tokenRanges = new ArrayList<>();
+    tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1, Long.MIN_VALUE, 0));
+    assertEquals(0.5, CassandraServiceImpl.getRingFraction(tokenRanges), 0);
+
+    // add a second range to cover all tokens available
+    tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1, 0, Long.MAX_VALUE));
+    assertEquals(1.0, CassandraServiceImpl.getRingFraction(tokenRanges), 0);
+  }
+
+  @Test
+  public void testEstimatedSizeBytes() throws Exception {
+    List<CassandraServiceImpl.TokenRange> tokenRanges = new ArrayList<>();
+    // one partition containing all tokens, the size is actually the size of the partition
+    tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1000, Long.MIN_VALUE, Long.MAX_VALUE));
+    assertEquals(1000, CassandraServiceImpl.getEstimatedSizeBytes(tokenRanges));
+
+    // one partition with half of the tokens, we estimate the size to the double of this partition
+    tokenRanges = new ArrayList<>();
+    tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1000, Long.MIN_VALUE, 0));
+    assertEquals(2000, CassandraServiceImpl.getEstimatedSizeBytes(tokenRanges));
+
+    // we have three partitions covering all tokens, the size is the sum of partition size *
+    // partition count
+    tokenRanges = new ArrayList<>();
+    tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1000, Long.MIN_VALUE, -3));
+    tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1000, -2, 10000));
+    tokenRanges.add(new CassandraServiceImpl.TokenRange(2, 3000, 10001, Long.MAX_VALUE));
+    assertEquals(8000, CassandraServiceImpl.getEstimatedSizeBytes(tokenRanges));
+  }
+
+  @Test
+  public void testThreeSplits() throws Exception {
+    CassandraServiceImpl service = new CassandraServiceImpl();
+    CassandraIO.Read spec = CassandraIO.read().withKeyspace("beam").withTable("test");
+    List<CassandraIO.CassandraSource> sources = service.split(spec, 50, 150);
+    assertEquals(3, sources.size());
+    assertTrue(sources.get(0).splitQuery.matches("SELECT \\* FROM beam.test WHERE token\\"
+        + "(\\$pk\\)<(.*)"));
+    assertTrue(sources.get(1).splitQuery.matches("SELECT \\* FROM beam.test WHERE token\\"
+        + "(\\$pk\\)>=(.*) AND token\\(\\$pk\\)<(.*)"));
+    assertTrue(sources.get(2).splitQuery.matches("SELECT \\* FROM beam.test WHERE token\\"
+        + "(\\$pk\\)>=(.*)"));
+  }
+
+  @Test
+  public void testTwoSplits() throws Exception {
+    CassandraServiceImpl service = new CassandraServiceImpl();
+    CassandraIO.Read spec = CassandraIO.read().withKeyspace("beam").withTable("test");
+    List<CassandraIO.CassandraSource> sources = service.split(spec, 50, 100);
+    assertEquals(2, sources.size());
+    LOG.info("TOKEN: " + ((double) Long.MAX_VALUE / 2));
+    LOG.info(sources.get(0).splitQuery);
+    LOG.info(sources.get(1).splitQuery);
+    assertEquals("SELECT * FROM beam.test WHERE token($pk)<" + ((double) Long.MAX_VALUE / 2) + ";",
+        sources.get(0).splitQuery);
+    assertEquals("SELECT * FROM beam.test WHERE token($pk)>=" + ((double) Long.MAX_VALUE / 2)
+            + ";",
+        sources.get(1).splitQuery);
+  }
+
+  @Test
+  public void testUniqueSplit() throws Exception {
+    CassandraServiceImpl service = new CassandraServiceImpl();
+    CassandraIO.Read spec = CassandraIO.read().withKeyspace("beam").withTable("test");
+    List<CassandraIO.CassandraSource> sources = service.split(spec, 100, 100);
+    assertEquals(1, sources.size());
+    assertEquals("SELECT * FROM beam.test;", sources.get(0).splitQuery);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraTestDataSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraTestDataSet.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraTestDataSet.java
new file mode 100644
index 0000000..461f5ea
--- /dev/null
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraTestDataSet.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manipulates test data used by the {@link CassandraIO} tests.
+ *
+ * <p>This is independent from the tests so that for read tests it can be run separately after
+ * data store creation rather than every time (which can be more fragile).
+ */
+public class CassandraTestDataSet {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CassandraTestDataSet.class);
+
+  /**
+   * Use this to create the read tables before IT read tests.
+   *
+   * <p>To invoke this class, you can use this command line:
+   * (run from the cassandra root directory)
+   * mvn test-compile exec:java -Dexec.mainClass=org.apache.beam.sdk.io.cassandra
+   * .CassandraTestDataSet \
+   *   -Dexec.args="--cassandraHost=localhost --cassandraPort=9042 \
+   *   -Dexec.classpathScope=test
+   * @param args Please pass options from IOTestPipelineOptions used for connection to Cassandra as
+   * shown above.
+   */
+  public static void main(String[] args) {
+    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+    IOTestPipelineOptions options =
+        PipelineOptionsFactory.fromArgs(args).as(IOTestPipelineOptions.class);
+
+    createDataTable(options);
+  }
+
+  public static final String KEYSPACE = "BEAM";
+  public static final String TABLE_READ_NAME = "BEAM_READ_TEST";
+  public static final String TABLE_WRITE_NAME = "BEAM_WRITE_TEST";
+
+  public static void createDataTable(IOTestPipelineOptions options) {
+    createTable(options, TABLE_READ_NAME);
+    insertTestData(options, TABLE_READ_NAME);
+    createTable(options, TABLE_WRITE_NAME);
+  }
+
+  public static Cluster getCluster(IOTestPipelineOptions options) {
+    return Cluster.builder()
+        .addContactPoint(options.getCassandraHost())
+        .withPort(options.getCassandraPort())
+        .build();
+  }
+
+  private static void createTable(IOTestPipelineOptions options, String tableName) {
+    Cluster cluster = null;
+    Session session = null;
+    try {
+      cluster = getCluster(options);
+      session = cluster.connect();
+
+      LOG.info("Create {} keyspace if not exists", KEYSPACE);
+      session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH REPLICATION = "
+          + "{'class':'SimpleStrategy', 'replication_factor':3};");
+
+      session.execute("USE " + KEYSPACE);
+
+      LOG.info("Create {} table if not exists", tableName);
+      session.execute("CREATE TABLE IF NOT EXISTS " + tableName + "(id int, name text, PRIMARY "
+          + "KEY(id))");
+    } finally {
+      if (session != null) {
+        session.close();
+      }
+      if (cluster != null) {
+        cluster.close();
+      }
+    }
+  }
+
+  private static void insertTestData(IOTestPipelineOptions options, String tableName) {
+    Cluster cluster = null;
+    Session session = null;
+    try {
+      cluster = getCluster(options);
+      session = cluster.connect();
+
+      LOG.info("Insert test dataset");
+      String[] scientists = {
+          "Lovelace",
+          "Franklin",
+          "Meitner",
+          "Hopper",
+          "Curie",
+          "Faraday",
+          "Newton",
+          "Bohr",
+          "Galilei",
+          "Maxwell"
+      };
+      for (int i = 0; i < 1000; i++) {
+        int index = i % scientists.length;
+        session.execute("INSERT INTO " + KEYSPACE + "." + tableName + "(id, name) values("
+            + i + ",'" + scientists[index] + "');");
+      }
+    } finally {
+      if (session != null) {
+        session.close();
+      }
+      if (cluster != null) {
+        cluster.close();
+      }
+    }
+  }
+
+  public static void cleanUpDataTable(IOTestPipelineOptions options) {
+      Cluster cluster = null;
+      Session session = null;
+      try {
+        cluster = getCluster(options);
+        session = cluster.connect();
+        session.execute("TRUNCATE TABLE " + KEYSPACE + "." + TABLE_WRITE_NAME);
+      } finally {
+        if (session != null) {
+          session.close();
+        }
+        if (cluster != null) {
+          cluster.close();
+        }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index d3915c9..387fd22 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -76,4 +76,14 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
   Integer getElasticsearchTcpPort();
   void setElasticsearchTcpPort(Integer value);
 
+  @Description("Host for Cassandra server (host name/ip address)")
+  @Default.String("cassandra-host")
+  String getCassandraHost();
+  void setCassandraHost(String host);
+
+  @Description("Port for Cassandra server")
+  @Default.Integer(7001)
+  Integer getCassandraPort();
+  void setCassandraPort(Integer port);
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 9657612..94fc6a7 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -78,6 +78,7 @@
     <module>mongodb</module>
     <module>mqtt</module>
     <module>xml</module>
+    <module>cassandra</module>
   </modules>
 
   <profiles>


[2/2] beam git commit: [BEAM-245] This closes #592

Posted by jb...@apache.org.
[BEAM-245] This closes #592


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c189d5c0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c189d5c0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c189d5c0

Branch: refs/heads/master
Commit: c189d5c0e4582b446564db9cbf1ae06970f6079d
Parents: 3cc4ff6 0b0bb3d
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed Jun 7 08:05:15 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed Jun 7 08:05:15 2017 +0200

----------------------------------------------------------------------
 sdks/java/io/cassandra/pom.xml                  | 113 ++++
 .../beam/sdk/io/cassandra/CassandraIO.java      | 510 +++++++++++++++++++
 .../beam/sdk/io/cassandra/CassandraService.java |  66 +++
 .../sdk/io/cassandra/CassandraServiceImpl.java  | 398 +++++++++++++++
 .../beam/sdk/io/cassandra/package-info.java     |  22 +
 .../beam/sdk/io/cassandra/CassandraIOIT.java    | 254 +++++++++
 .../beam/sdk/io/cassandra/CassandraIOTest.java  | 279 ++++++++++
 .../io/cassandra/CassandraServiceImplTest.java  | 138 +++++
 .../sdk/io/cassandra/CassandraTestDataSet.java  | 153 ++++++
 .../sdk/io/common/IOTestPipelineOptions.java    |  10 +
 sdks/java/io/pom.xml                            |   1 +
 11 files changed, 1944 insertions(+)
----------------------------------------------------------------------