You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:35:25 UTC
[36/50] beam git commit: [BEAM-245] Add CassandraIO
[BEAM-245] Add CassandraIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b0bb3dc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b0bb3dc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b0bb3dc
Branch: refs/heads/DSL_SQL
Commit: 0b0bb3dc8d18b7d78780dbd39705e16a8aae028e
Parents: 3cc4ff6
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Tue Mar 28 16:46:37 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed Jun 7 07:40:05 2017 +0200
----------------------------------------------------------------------
sdks/java/io/cassandra/pom.xml | 113 ++++
.../beam/sdk/io/cassandra/CassandraIO.java | 510 +++++++++++++++++++
.../beam/sdk/io/cassandra/CassandraService.java | 66 +++
.../sdk/io/cassandra/CassandraServiceImpl.java | 398 +++++++++++++++
.../beam/sdk/io/cassandra/package-info.java | 22 +
.../beam/sdk/io/cassandra/CassandraIOIT.java | 254 +++++++++
.../beam/sdk/io/cassandra/CassandraIOTest.java | 279 ++++++++++
.../io/cassandra/CassandraServiceImplTest.java | 138 +++++
.../sdk/io/cassandra/CassandraTestDataSet.java | 153 ++++++
.../sdk/io/common/IOTestPipelineOptions.java | 10 +
sdks/java/io/pom.xml | 1 +
11 files changed, 1944 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/pom.xml b/sdks/java/io/cassandra/pom.xml
new file mode 100644
index 0000000..8249f57
--- /dev/null
+++ b/sdks/java/io/cassandra/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-parent</artifactId>
+ <version>2.1.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-sdks-java-io-cassandra</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: IO :: Cassandra</name>
+ <description>IO to read and write with Apache Cassandra database</description>
+
+ <properties>
+ <cassandra.driver.version>3.2.0</cassandra.driver.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-mapping</artifactId>
+ <version>${cassandra.driver.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ <version>${cassandra.driver.version}</version>
+ </dependency>
+
+ <!-- compile dependencies -->
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-common</artifactId>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
new file mode 100644
index 0000000..b6f4ef6
--- /dev/null
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An IO to read from Apache Cassandra.
+ *
+ * <h3>Reading from Apache Cassandra</h3>
+ *
+ * <p>{@code CassandraIO} provides a source to read and returns a bounded collection of
+ * entities as {@code PCollection<Entity>}. An entity is built by Cassandra mapper
+ * ({@code com.datastax.driver.mapping.EntityMapper}) based on a
+ * POJO containing annotations (as described http://docs.datastax
+ * .com/en/developer/java-driver/2.1/manual/object_mapper/creating/").
+ *
+ * <p>The following example illustrates various options for configuring the IO:
+ *
+ * <pre>{@code
+ * pipeline.apply(CassandraIO.<Person>read()
+ * .withHosts(Arrays.asList("host1", "host2"))
+ * .withPort(9042)
+ * .withKeyspace("beam")
+ * .withTable("Person")
+ * .withEntity(Person.class)
+ * .withCoder(SerializableCoder.of(Person.class))
+ * // above options are the minimum set, returns PCollection<Person>
+ *
+ * }</pre>
+ *
+ * <h3>Writing to Apache Cassandra</h3>
+ *
+ * <p>{@code CassandraIO} provides a sink to write a collection of entities to Apache Cassandra.
+ *
+ * <p>The following example illustrates various options for configuring the IO write:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(...) // provides a PCollection<Person> where Person is an entity
+ * .apply(CassandraIO.<Person>write()
+ * .withHosts(Arrays.asList("host1", "host2"))
+ * .withPort(9042)
+ * .withKeyspace("beam")
+ * .withEntity(Person.class));
+ * }</pre>
+ */
+@Experimental
+public class CassandraIO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraIO.class);
+
+ private CassandraIO() {}
+
+ /**
+ * Provide a {@link Read} {@link PTransform} to read data from a Cassandra database.
+ */
+ public static <T> Read<T> read() {
+ return new AutoValue_CassandraIO_Read.Builder<T>().build();
+ }
+
+ /**
+ * Provide a {@link Write} {@link PTransform} to write data to a Cassandra database.
+ */
+ public static <T> Write<T> write() {
+ return new AutoValue_CassandraIO_Write.Builder<T>().build();
+ }
+
+ /**
+ * A {@link PTransform} to read data from Apache Cassandra. See {@link CassandraIO} for more
+ * information on usage and configuration.
+ */
+ @AutoValue
+ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+
+ @Nullable abstract List<String> hosts();
+ @Nullable abstract Integer port();
+ @Nullable abstract String keyspace();
+ @Nullable abstract String table();
+ @Nullable abstract Class<T> entity();
+ @Nullable abstract Coder<T> coder();
+ @Nullable abstract String username();
+ @Nullable abstract String password();
+ @Nullable abstract String localDc();
+ @Nullable abstract String consistencyLevel();
+ @Nullable abstract CassandraService<T> cassandraService();
+ abstract Builder<T> builder();
+
+ /**
+ * Specify the hosts of the Apache Cassandra instances.
+ */
+ public Read<T> withHosts(List<String> hosts) {
+ checkArgument(hosts != null, "CassandraIO.read().withHosts(hosts) called with null hosts");
+ checkArgument(!hosts.isEmpty(), "CassandraIO.read().withHosts(hosts) called with empty "
+ + "hosts list");
+ return builder().setHosts(hosts).build();
+ }
+
+ /**
+ * Specify the port number of the Apache Cassandra instances.
+ */
+ public Read<T> withPort(int port) {
+ checkArgument(port > 0, "CassandraIO.read().withPort(port) called with invalid port "
+ + "number (%d)", port);
+ return builder().setPort(port).build();
+ }
+
+ /**
+ * Specify the Cassandra keyspace where to read data.
+ */
+ public Read<T> withKeyspace(String keyspace) {
+ checkArgument(keyspace != null, "CassandraIO.read().withKeyspace(keyspace) called with "
+ + "null keyspace");
+ return builder().setKeyspace(keyspace).build();
+ }
+
+ /**
+ * Specify the Cassandra table where to read data.
+ */
+ public Read<T> withTable(String table) {
+ checkArgument(table != null, "CassandraIO.read().withTable(table) called with null table");
+ return builder().setTable(table).build();
+ }
+
+ /**
+ * Specify the entity class (annotated POJO). The {@link CassandraIO} will read the data and
+ * convert the data as entity instances. The {@link PCollection} resulting from the read will
+ * contains entity elements.
+ */
+ public Read<T> withEntity(Class<T> entity) {
+ checkArgument(entity != null, "CassandraIO.read().withEntity(entity) called with null "
+ + "entity");
+ return builder().setEntity(entity).build();
+ }
+
+ /**
+ * Specify the {@link Coder} used to serialize the entity in the {@link PCollection}.
+ */
+ public Read<T> withCoder(Coder<T> coder) {
+ checkArgument(coder != null, "CassandraIO.read().withCoder(coder) called with null coder");
+ return builder().setCoder(coder).build();
+ }
+
+ /**
+ * Specify the username for authentication.
+ */
+ public Read<T> withUsername(String username) {
+ checkArgument(username != null, "CassandraIO.read().withUsername(username) called with "
+ + "null username");
+ return builder().setUsername(username).build();
+ }
+
+ /**
+ * Specify the password for authentication.
+ */
+ public Read<T> withPassword(String password) {
+ checkArgument(password != null, "CassandraIO.read().withPassword(password) called with "
+ + "null password");
+ return builder().setPassword(password).build();
+ }
+
+ /**
+ * Specify the local DC used for the load balancing.
+ */
+ public Read<T> withLocalDc(String localDc) {
+ checkArgument(localDc != null, "CassandraIO.read().withLocalDc(localDc) called with null "
+ + "localDc");
+ return builder().setLocalDc(localDc).build();
+ }
+
+ public Read<T> withConsistencyLevel(String consistencyLevel) {
+ checkArgument(consistencyLevel != null, "CassandraIO.read().withConsistencyLevel"
+ + "(consistencyLevel) called with null consistencyLevel");
+ return builder().setConsistencyLevel(consistencyLevel).build();
+ }
+
+ /**
+ * Specify an instance of {@link CassandraService} used to connect and read from Cassandra
+ * database.
+ */
+ public Read<T> withCassandraService(CassandraService<T> cassandraService) {
+ checkArgument(cassandraService != null, "CassandraIO.read().withCassandraService(service)"
+ + " called with null service");
+ return builder().setCassandraService(cassandraService).build();
+ }
+
+ @Override
+ public PCollection<T> expand(PBegin input) {
+ return input.apply(org.apache.beam.sdk.io.Read.from(
+ new CassandraSource<T>(this, null)));
+ }
+
+ @Override
+ public void validate(PipelineOptions pipelineOptions) {
+ checkState(hosts() != null || cassandraService() != null,
+ "CassandraIO.read() requires a list of hosts to be set via withHosts(hosts) or a "
+ + "Cassandra service to be set via withCassandraService(service)");
+ checkState(port() != null || cassandraService() != null, "CassandraIO.read() requires a "
+ + "valid port number to be set via withPort(port) or a Cassandra service to be set via "
+ + "withCassandraService(service)");
+ checkState(keyspace() != null, "CassandraIO.read() requires a keyspace to be set via "
+ + "withKeyspace(keyspace)");
+ checkState(table() != null, "CassandraIO.read() requires a table to be set via "
+ + "withTable(table)");
+ checkState(entity() != null, "CassandraIO.read() requires an entity to be set via "
+ + "withEntity(entity)");
+ checkState(coder() != null, "CassandraIO.read() requires a coder to be set via "
+ + "withCoder(coder)");
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setHosts(List<String> hosts);
+ abstract Builder<T> setPort(Integer port);
+ abstract Builder<T> setKeyspace(String keyspace);
+ abstract Builder<T> setTable(String table);
+ abstract Builder<T> setEntity(Class<T> entity);
+ abstract Builder<T> setCoder(Coder<T> coder);
+ abstract Builder<T> setUsername(String username);
+ abstract Builder<T> setPassword(String password);
+ abstract Builder<T> setLocalDc(String localDc);
+ abstract Builder<T> setConsistencyLevel(String consistencyLevel);
+ abstract Builder<T> setCassandraService(CassandraService<T> cassandraService);
+ abstract Read<T> build();
+ }
+
+ /**
+ * Helper function to either get a fake/mock Cassandra service provided by
+ * {@link #withCassandraService(CassandraService)} or creates and returns an implementation
+ * of a concrete Cassandra service dealing with a Cassandra instance.
+ */
+ @VisibleForTesting
+ CassandraService<T> getCassandraService() {
+ if (cassandraService() != null) {
+ return cassandraService();
+ }
+ return new CassandraServiceImpl<>();
+ }
+
+ }
+
+ @VisibleForTesting
+ static class CassandraSource<T> extends BoundedSource<T> {
+
+ protected final Read<T> spec;
+ protected final String splitQuery;
+
+ CassandraSource(Read<T> spec,
+ String splitQuery) {
+ this.spec = spec;
+ this.splitQuery = splitQuery;
+ }
+
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ return spec.coder();
+ }
+
+ @Override
+ public void validate() {
+ spec.validate(null);
+ }
+
+ @Override
+ public BoundedReader<T> createReader(PipelineOptions pipelineOptions) {
+ return spec.getCassandraService().createReader(this);
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
+ return spec.getCassandraService().getEstimatedSizeBytes(spec);
+ }
+
+ @Override
+ public List<BoundedSource<T>> split(long desiredBundleSizeBytes,
+ PipelineOptions pipelineOptions) {
+ return spec.getCassandraService()
+ .split(spec, desiredBundleSizeBytes);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ if (spec.hosts() != null) {
+ builder.add(DisplayData.item("hosts", spec.hosts().toString()));
+ }
+ if (spec.port() != null) {
+ builder.add(DisplayData.item("port", spec.port()));
+ }
+ builder.addIfNotNull(DisplayData.item("keyspace", spec.keyspace()));
+ builder.addIfNotNull(DisplayData.item("table", spec.table()));
+ builder.addIfNotNull(DisplayData.item("username", spec.username()));
+ builder.addIfNotNull(DisplayData.item("localDc", spec.localDc()));
+ builder.addIfNotNull(DisplayData.item("consistencyLevel", spec.consistencyLevel()));
+ }
+ }
+
+ /**
+ * A {@link PTransform} to write into Apache Cassandra. See {@link CassandraIO} for details on
+ * usage and configuration.
+ */
+ @AutoValue
+ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+
+ @Nullable abstract List<String> hosts();
+ @Nullable abstract Integer port();
+ @Nullable abstract String keyspace();
+ @Nullable abstract Class<T> entity();
+ @Nullable abstract String username();
+ @Nullable abstract String password();
+ @Nullable abstract String localDc();
+ @Nullable abstract String consistencyLevel();
+ @Nullable abstract CassandraService<T> cassandraService();
+ abstract Builder<T> builder();
+
+ /**
+ * Specify the Cassandra instance hosts where to write data.
+ */
+ public Write<T> withHosts(List<String> hosts) {
+ checkArgument(hosts != null, "CassandraIO.write().withHosts(hosts) called with null hosts");
+ checkArgument(!hosts.isEmpty(), "CassandraIO.write().withHosts(hosts) called with empty "
+ + "hosts list");
+ return builder().setHosts(hosts).build();
+ }
+
+ /**
+ * Specify the Cassandra instance port number where to write data.
+ */
+ public Write<T> withPort(int port) {
+ checkArgument(port > 0, "CassandraIO.write().withPort(port) called with invalid port "
+ + "number (%d)", port);
+ return builder().setPort(port).build();
+ }
+
+ /**
+ * Specify the Cassandra keyspace where to write data.
+ */
+ public Write<T> withKeyspace(String keyspace) {
+ checkArgument(keyspace != null, "CassandraIO.write().withKeyspace(keyspace) called with "
+ + "null keyspace");
+ return builder().setKeyspace(keyspace).build();
+ }
+
+ /**
+ * Specify the entity class in the input {@link PCollection}. The {@link CassandraIO} will
+ * map this entity to the Cassandra table thanks to the annotations.
+ */
+ public Write<T> withEntity(Class<T> entity) {
+ checkArgument(entity != null, "CassandraIO.write().withEntity(entity) called with null "
+ + "entity");
+ return builder().setEntity(entity).build();
+ }
+
+ /**
+ * Specify the username used for authentication.
+ */
+ public Write<T> withUsername(String username) {
+ checkArgument(username != null, "CassandraIO.write().withUsername(username) called with "
+ + "null username");
+ return builder().setUsername(username).build();
+ }
+
+ /**
+ * Specify the password used for authentication.
+ */
+ public Write<T> withPassword(String password) {
+ checkArgument(password != null, "CassandraIO.write().withPassword(password) called with "
+ + "null password");
+ return builder().setPassword(password).build();
+ }
+
+ /**
+ * Specify the local DC used by the load balancing policy.
+ */
+ public Write<T> withLocalDc(String localDc) {
+ checkArgument(localDc != null, "CassandraIO.write().withLocalDc(localDc) called with null"
+ + " localDc");
+ return builder().setLocalDc(localDc).build();
+ }
+
+ public Write<T> withConsistencyLevel(String consistencyLevel) {
+ checkArgument(consistencyLevel != null, "CassandraIO.write().withConsistencyLevel"
+ + "(consistencyLevel) called with null consistencyLevel");
+ return builder().setConsistencyLevel(consistencyLevel).build();
+ }
+
+ /**
+ * Specify the {@link CassandraService} used to connect and write into the Cassandra database.
+ */
+ public Write<T> withCassandraService(CassandraService<T> cassandraService) {
+ checkArgument(cassandraService != null, "CassandraIO.write().withCassandraService"
+ + "(service) called with null service");
+ return builder().setCassandraService(cassandraService).build();
+ }
+
+ @Override
+ public void validate(PipelineOptions pipelineOptions) {
+ checkState(hosts() != null || cassandraService() != null,
+ "CassandraIO.write() requires a list of hosts to be set via withHosts(hosts) or a "
+ + "Cassandra service to be set via withCassandraService(service)");
+ checkState(port() != null || cassandraService() != null, "CassandraIO.write() requires a "
+ + "valid port number to be set via withPort(port) or a Cassandra service to be set via "
+ + "withCassandraService(service)");
+ checkState(keyspace() != null, "CassandraIO.write() requires a keyspace to be set via "
+ + "withKeyspace(keyspace)");
+ checkState(entity() != null, "CassandraIO.write() requires an entity to be set via "
+ + "withEntity(entity)");
+ }
+
+ @Override
+ public PDone expand(PCollection<T> input) {
+ input.apply(ParDo.of(new WriteFn<T>(this)));
+ return PDone.in(input.getPipeline());
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> setHosts(List<String> hosts);
+ abstract Builder<T> setPort(Integer port);
+ abstract Builder<T> setKeyspace(String keyspace);
+ abstract Builder<T> setEntity(Class<T> entity);
+ abstract Builder<T> setUsername(String username);
+ abstract Builder<T> setPassword(String password);
+ abstract Builder<T> setLocalDc(String localDc);
+ abstract Builder<T> setConsistencyLevel(String consistencyLevel);
+ abstract Builder<T> setCassandraService(CassandraService<T> cassandraService);
+ abstract Write<T> build();
+ }
+
+ /**
+ * Helper function to either get a fake/mock Cassandra service provided by
+ * {@link #withCassandraService(CassandraService)} or creates and returns an implementation
+ * of a concrete Cassandra service dealing with a Cassandra instance.
+ */
+ @VisibleForTesting
+ CassandraService<T> getCassandraService() {
+ if (cassandraService() != null) {
+ return cassandraService();
+ }
+ return new CassandraServiceImpl<>();
+ }
+
+ }
+
+ private static class WriteFn<T> extends DoFn<T, Void> {
+
+ private final Write<T> spec;
+ private CassandraService.Writer writer;
+
+ public WriteFn(Write<T> spec) {
+ this.spec = spec;
+ }
+
+ @Setup
+ public void setup() throws Exception {
+ writer = spec.getCassandraService().createWriter(spec);
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext processContext) {
+ T entity = processContext.element();
+ writer.write(entity);
+ }
+
+ @Teardown
+ public void teardown() throws Exception {
+ writer.close();
+ writer = null;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
new file mode 100644
index 0000000..5071762
--- /dev/null
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.beam.sdk.io.BoundedSource;
+
+/**
+ * An interface for real or fake implementations of Cassandra.
+ */
+public interface CassandraService<T> extends Serializable {
+
+ /**
+ * Returns a {@link org.apache.beam.sdk.io.BoundedSource.BoundedReader} that will read from
+ * Cassandra using the spec from
+ * {@link org.apache.beam.sdk.io.cassandra.CassandraIO.CassandraSource}.
+ */
+ BoundedSource.BoundedReader<T> createReader(CassandraIO.CassandraSource<T> source);
+
+ /**
+ * Returns an estimation of the size that could be read.
+ */
+ long getEstimatedSizeBytes(CassandraIO.Read<T> spec);
+
+ /**
+ * Split a table read into several sources.
+ */
+ List<BoundedSource<T>> split(CassandraIO.Read<T> spec,
+ long desiredBundleSizeBytes);
+
+ /**
+ * Create a {@link Writer} that writes entities into the Cassandra instance.
+ */
+ Writer createWriter(CassandraIO.Write<T> spec) throws Exception;
+
+ /**
+ * Writer for an entity.
+ */
+ interface Writer<T> extends AutoCloseable {
+
+ /**
+ * This method should be synchronous. It means you have to be sure that the entity is fully
+ * stored (and committed) into the Cassandra instance when you exit from this method.
+ */
+ void write(T entity);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
new file mode 100644
index 0000000..63c8ef4
--- /dev/null
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the {@link CassandraService} that actually use a Cassandra instance.
+ */
+public class CassandraServiceImpl<T> implements CassandraService<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraServiceImpl.class);
+
+ private static final long MIN_TOKEN = Long.MIN_VALUE;
+ private static final long MAX_TOKEN = Long.MAX_VALUE;
+ private static final BigInteger TOTAL_TOKEN_COUNT =
+ BigInteger.valueOf(MAX_TOKEN).subtract(BigInteger.valueOf(MIN_TOKEN));
+
+ private class CassandraReaderImpl<T> extends BoundedSource.BoundedReader<T> {
+
+ private final CassandraIO.CassandraSource<T> source;
+
+ private Cluster cluster;
+ private Session session;
+ private ResultSet resultSet;
+ private Iterator<T> iterator;
+ private T current;
+
+ public CassandraReaderImpl(CassandraIO.CassandraSource<T> source) {
+ this.source = source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ LOG.debug("Starting Cassandra reader");
+ cluster = getCluster(source.spec.hosts(), source.spec.port(), source.spec.username(),
+ source.spec.password(), source.spec.localDc(), source.spec.consistencyLevel());
+ session = cluster.connect();
+ LOG.debug("Query: " + source.splitQuery);
+ resultSet = session.execute(source.splitQuery);
+
+ final MappingManager mappingManager = new MappingManager(session);
+ Mapper mapper = mappingManager.mapper(source.spec.entity());
+ iterator = mapper.map(resultSet).iterator();
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (iterator.hasNext()) {
+ current = iterator.next();
+ return true;
+ }
+ current = null;
+ return false;
+ }
+
+ @Override
+ public void close() {
+ LOG.debug("Closing Cassandra reader");
+ if (session != null) {
+ session.close();
+ }
+ if (cluster != null) {
+ cluster.close();
+ }
+ }
+
+ @Override
+ public T getCurrent() throws NoSuchElementException {
+ if (current == null) {
+ throw new NoSuchElementException();
+ }
+ return current;
+ }
+
+ @Override
+ public CassandraIO.CassandraSource<T> getCurrentSource() {
+ return source;
+ }
+
+ }
+
+ @Override
+ public CassandraReaderImpl<T> createReader(CassandraIO.CassandraSource<T> source) {
+ return new CassandraReaderImpl<>(source);
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(CassandraIO.Read<T> spec) {
+ try (Cluster cluster = getCluster(spec.hosts(), spec.port(), spec.username(), spec.password(),
+ spec.localDc(), spec.consistencyLevel())) {
+ if (isMurmur3Partitioner(cluster)) {
+ try {
+ List<TokenRange> tokenRanges = getTokenRanges(cluster,
+ spec.keyspace(),
+ spec.table());
+ return getEstimatedSizeBytes(tokenRanges);
+ } catch (Exception e) {
+ LOG.warn("Can't estimate the size", e);
+ return 0L;
+ }
+ } else {
+ LOG.warn("Only Murmur3 partitioner is supported, can't estimate the size");
+ return 0L;
+ }
+ }
+ }
+
+ /**
+ * Actually estimate the size of the data to read on the cluster, based on the given token
+ * ranges to address.
+ */
+ @VisibleForTesting
+ protected static long getEstimatedSizeBytes(List<TokenRange> tokenRanges) {
+ long size = 0L;
+ for (TokenRange tokenRange : tokenRanges) {
+ size = size + tokenRange.meanPartitionSize * tokenRange.partitionCount;
+ }
+ return Math.round(size / getRingFraction(tokenRanges));
+ }
+
+ @Override
+ public List<BoundedSource<T>> split(CassandraIO.Read<T> spec,
+ long desiredBundleSizeBytes) {
+ try (Cluster cluster = getCluster(spec.hosts(), spec.port(), spec.username(), spec.password(),
+ spec.localDc(), spec.consistencyLevel())) {
+ if (isMurmur3Partitioner(cluster)) {
+ LOG.info("Murmur3Partitioner detected, splitting");
+ return split(spec, desiredBundleSizeBytes, getEstimatedSizeBytes(spec));
+ } else {
+ LOG.warn("Only Murmur3Partitioner is supported for splitting, using an unique source for "
+ + "the read");
+ String splitQuery = QueryBuilder.select().from(spec.keyspace(), spec.table()).toString();
+ List<BoundedSource<T>> sources = new ArrayList<>();
+ sources.add(new CassandraIO.CassandraSource<T>(spec, splitQuery));
+ return sources;
+ }
+ }
+ }
+
+ /**
+ * Compute the number of splits based on the estimated size and the desired bundle size, and
+ * create several sources.
+ */
+ @VisibleForTesting
+ protected List<BoundedSource<T>> split(CassandraIO.Read<T> spec,
+ long desiredBundleSizeBytes,
+ long estimatedSizeBytes) {
+ long numSplits = 1;
+ List<BoundedSource<T>> sourceList = new ArrayList<>();
+ if (desiredBundleSizeBytes > 0) {
+ numSplits = estimatedSizeBytes / desiredBundleSizeBytes;
+ }
+ if (numSplits <= 0) {
+ LOG.warn("Number of splits is less than 0 ({}), fallback to 1", numSplits);
+ numSplits = 1;
+ }
+
+ LOG.info("Number of splits is {}", numSplits);
+
+ double startRange = MIN_TOKEN;
+ double endRange = MAX_TOKEN;
+ double startToken, endToken;
+
+ endToken = startRange;
+ double incrementValue = endRange - startRange / numSplits;
+ String splitQuery;
+ if (numSplits == 1) {
+ // we have an unique split
+ splitQuery = QueryBuilder.select().from(spec.keyspace(), spec.table()).toString();
+ sourceList.add(new CassandraIO.CassandraSource<T>(spec, splitQuery));
+ } else {
+ // we have more than one split
+ for (int i = 0; i < numSplits; i++) {
+ startToken = endToken;
+ endToken = (i == numSplits) ? endRange : (startToken + incrementValue);
+ Select.Where builder = QueryBuilder.select().from(spec.keyspace(), spec.table()).where();
+ if (i > 0) {
+ builder = builder.and(QueryBuilder.gte("token($pk)", startToken));
+ }
+ if (i < (numSplits - 1)) {
+ builder = builder.and(QueryBuilder.lt("token($pk)", endToken));
+ }
+ sourceList.add(new CassandraIO.CassandraSource(spec, builder.toString()));
+ }
+ }
+ return sourceList;
+ }
+
+ /**
+ * Get a Cassandra cluster using hosts and port.
+ */
+ private Cluster getCluster(List<String> hosts, int port, String username, String password,
+ String localDc, String consistencyLevel) {
+ Cluster.Builder builder = Cluster.builder()
+ .addContactPoints(hosts.toArray(new String[0]))
+ .withPort(port);
+
+ if (username != null) {
+ builder.withAuthProvider(new PlainTextAuthProvider(username, password));
+ }
+
+ if (localDc != null) {
+ builder.withLoadBalancingPolicy(
+ new TokenAwarePolicy(new DCAwareRoundRobinPolicy.Builder().withLocalDc(localDc).build()));
+ } else {
+ builder.withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy()));
+ }
+
+ if (consistencyLevel != null) {
+ builder.withQueryOptions(
+ new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel)));
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Gets the list of token ranges that a table occupies on a give Cassandra node.
+ *
+ * <p>NB: This method is compatible with Cassandra 2.1.5 and greater.
+ */
+ private static List<TokenRange> getTokenRanges(Cluster cluster, String keyspace, String table) {
+ try (Session session = cluster.newSession()) {
+ ResultSet resultSet =
+ session.execute(
+ "SELECT range_start, range_end, partitions_count, mean_partition_size FROM "
+ + "system.size_estimates WHERE keyspace_name = ? AND table_name = ?",
+ keyspace,
+ table);
+
+ ArrayList<TokenRange> tokenRanges = new ArrayList<>();
+ for (Row row : resultSet) {
+ TokenRange tokenRange =
+ new TokenRange(
+ row.getLong("partitions_count"),
+ row.getLong("mean_partition_size"),
+ row.getLong("range_start"),
+ row.getLong("range_end"));
+ tokenRanges.add(tokenRange);
+ }
+ // The table may not contain the estimates yet
+ // or have partitions_count and mean_partition_size fields = 0
+ // if the data was just inserted and the amount of data in the table was small.
+ // This is very common situation during tests,
+ // when we insert a few rows and immediately query them.
+ // However, for tiny data sets the lack of size estimates is not a problem at all,
+ // because we don't want to split tiny data anyways.
+ // Therefore, we're not issuing a warning if the result set was empty
+ // or mean_partition_size and partitions_count = 0.
+ return tokenRanges;
+ }
+ }
+
+ /**
+ * Compute the percentage of token addressed compared with the whole tokens in the cluster.
+ */
+ @VisibleForTesting
+ protected static double getRingFraction(List<TokenRange> tokenRanges) {
+ double ringFraction = 0;
+ for (TokenRange tokenRange : tokenRanges) {
+ ringFraction = ringFraction + (distance(tokenRange.rangeStart, tokenRange.rangeEnd)
+ .doubleValue() / TOTAL_TOKEN_COUNT.doubleValue());
+ }
+ return ringFraction;
+ }
+
+ /**
+ * Measure distance between two tokens.
+ */
+ @VisibleForTesting
+ protected static BigInteger distance(long left, long right) {
+ if (right > left) {
+ return BigInteger.valueOf(right).subtract(BigInteger.valueOf(left));
+ } else {
+ return BigInteger.valueOf(right).subtract(BigInteger.valueOf(left)).add(TOTAL_TOKEN_COUNT);
+ }
+ }
+
+ /**
+ * Check if the current partitioner is the Murmur3 (default in Cassandra version newer than 2).
+ */
+ @VisibleForTesting
+ protected static boolean isMurmur3Partitioner(Cluster cluster) {
+ return cluster.getMetadata().getPartitioner()
+ .equals("org.apache.cassandra.dht.Murmur3Partitioner");
+ }
+
+ /**
+ * Represent a token range in Cassandra instance, wrapping the partition count, size and token
+ * range.
+ */
+ @VisibleForTesting
+ protected static class TokenRange {
+ private final long partitionCount;
+ private final long meanPartitionSize;
+ private final long rangeStart;
+ private final long rangeEnd;
+
+ public TokenRange(
+ long partitionCount, long meanPartitionSize, long rangeStart, long
+ rangeEnd) {
+ this.partitionCount = partitionCount;
+ this.meanPartitionSize = meanPartitionSize;
+ this.rangeStart = rangeStart;
+ this.rangeEnd = rangeEnd;
+ }
+ }
+
+ /**
+ * Writer storing an entity into Apache Cassandra database.
+ */
+ protected class WriterImpl<T> implements Writer<T> {
+
+ private final CassandraIO.Write<T> spec;
+
+ private final Cluster cluster;
+ private final Session session;
+ private final MappingManager mappingManager;
+
+ public WriterImpl(CassandraIO.Write<T> spec) {
+ this.spec = spec;
+ this.cluster = getCluster(spec.hosts(), spec.port(), spec.username(), spec.password(),
+ spec.localDc(), spec.consistencyLevel());
+ this.session = cluster.connect(spec.keyspace());
+ this.mappingManager = new MappingManager(session);
+ }
+
+ /**
+ * Write the entity to the Cassandra instance, using {@link Mapper} obtained with the
+ * {@link MappingManager}. This method use {@link Mapper#save(Object)} method, which is
+ * synchronous. It means the entity is guaranteed to be reliably committed to Cassandra.
+ */
+ @Override
+ public void write(T entity) {
+ Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(entity.getClass());
+ mapper.save(entity);
+ }
+
+ @Override
+ public void close() {
+ if (session != null) {
+ session.close();
+ }
+ if (cluster != null) {
+ cluster.close();
+ }
+ }
+
+ }
+
+ @Override
+ public Writer createWriter(CassandraIO.Write<T> spec) {
+ return new WriterImpl(spec);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/package-info.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/package-info.java
new file mode 100644
index 0000000..6659b62
--- /dev/null
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from Apache Cassandra database.
+ */
+package org.apache.beam.sdk.io.cassandra;
http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
new file mode 100644
index 0000000..e67d305
--- /dev/null
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import static org.junit.Assert.assertEquals;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.driver.mapping.annotations.PartitionKey;
+import com.datastax.driver.mapping.annotations.Table;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SerializableMatcher;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * A test of {@link CassandraIO} on a concrete and independent Cassandra instance.
+ *
+ * <p>This test requires a running Cassandra instance, and the test dataset must exists.
+ *
+ * <p>You can run this test directly using Maven with:
+ *
+ * <pre>{@code
+ * mvn -e -Pio-it verify -pl sdks/java/io/cassandra -DintegrationTestPipelineOptions='[
+ * "--cassandraHost=1.2.3.4",
+ * "--cassandraPort=9042"]'
+ * }</pre>
+ */
+@RunWith(JUnit4.class)
+public class CassandraIOIT implements Serializable {
+
+ private static IOTestPipelineOptions options;
+
+ @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+ options = TestPipeline.testingPipelineOptions()
+ .as(IOTestPipelineOptions.class);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ // cleanup the write table
+ CassandraTestDataSet.cleanUpDataTable(options);
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ PCollection<Scientist> output = pipeline.apply(CassandraIO.<Scientist>read()
+ .withHosts(Collections.singletonList(options.getCassandraHost()))
+ .withPort(options.getCassandraPort())
+ .withKeyspace(CassandraTestDataSet.KEYSPACE)
+ .withTable(CassandraTestDataSet.TABLE_READ_NAME)
+ .withEntity(Scientist.class)
+ .withCoder(SerializableCoder.of(Scientist.class)));
+
+ PAssert.thatSingleton(output.apply("Count scientist", Count.<Scientist>globally()))
+ .isEqualTo(1000L);
+
+ PCollection<KV<String, Integer>> mapped =
+ output.apply(
+ MapElements.via(
+ new SimpleFunction<Scientist, KV<String, Integer>>() {
+ public KV<String, Integer> apply(Scientist scientist) {
+ KV<String, Integer> kv = KV.of(scientist.name, scientist.id);
+ return kv;
+ }
+ }
+ )
+ );
+ PAssert.that(mapped.apply("Count occurrences per scientist", Count.<String, Integer>perKey()))
+ .satisfies(
+ new SerializableFunction<Iterable<KV<String, Long>>, Void>() {
+ @Override
+ public Void apply(Iterable<KV<String, Long>> input) {
+ for (KV<String, Long> element : input) {
+ assertEquals(element.getKey(), 1000 / 10, element.getValue().longValue());
+ }
+ return null;
+ }
+ }
+ );
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ IOTestPipelineOptions options =
+ TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
+
+ options.setOnSuccessMatcher(
+ new CassandraMatcher(
+ CassandraTestDataSet.getCluster(options),
+ CassandraTestDataSet.TABLE_WRITE_NAME));
+
+ TestPipeline.convertToArgs(options);
+
+ ArrayList<ScientistForWrite> data = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ ScientistForWrite scientist = new ScientistForWrite();
+ scientist.id = i;
+ scientist.name = "Name " + i;
+ data.add(scientist);
+ }
+
+ pipeline
+ .apply(Create.of(data))
+ .apply(CassandraIO.<ScientistForWrite>write()
+ .withHosts(Collections.singletonList(options.getCassandraHost()))
+ .withPort(options.getCassandraPort())
+ .withKeyspace(CassandraTestDataSet.KEYSPACE)
+ .withEntity(ScientistForWrite.class));
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ /**
+ * Simple matcher.
+ */
+ public class CassandraMatcher extends TypeSafeMatcher<PipelineResult>
+ implements SerializableMatcher<PipelineResult> {
+
+ private String tableName;
+ private Cluster cluster;
+
+ public CassandraMatcher(Cluster cluster, String tableName) {
+ this.cluster = cluster;
+ this.tableName = tableName;
+ }
+
+ @Override
+ protected boolean matchesSafely(PipelineResult pipelineResult) {
+ pipelineResult.waitUntilFinish();
+ Session session = cluster.connect();
+ ResultSet result = session.execute("select id,name from " + CassandraTestDataSet.KEYSPACE
+ + "." + tableName);
+ List<Row> rows = result.all();
+ if (rows.size() != 1000) {
+ return false;
+ }
+ for (Row row : rows) {
+ if (!row.getString("name").matches("Name.*")) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("Expected Cassandra record pattern is (Name.*)");
+ }
+ }
+
+ /**
+ * Simple Cassandra entity representing a scientist. Used for read test.
+ */
+ @Table(name = CassandraTestDataSet.TABLE_READ_NAME, keyspace = CassandraTestDataSet.KEYSPACE)
+ public static class Scientist implements Serializable {
+
+ @PartitionKey
+ @Column(name = "id")
+ private final int id;
+
+ @Column(name = "name")
+ private final String name;
+
+ public Scientist() {
+ this(0, "");
+ }
+
+ public Scientist(int id) {
+ this(0, "");
+ }
+
+ public Scientist(int id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+ }
+
+ /**
+ * Simple Cassandra entity representing a scientist, used for write test.
+ */
+ @Table(name = CassandraTestDataSet.TABLE_WRITE_NAME, keyspace = CassandraTestDataSet.KEYSPACE)
+ public class ScientistForWrite implements Serializable {
+
+ @PartitionKey
+ @Column(name = "id")
+ public Integer id;
+
+ @Column(name = "name")
+ public String name;
+
+ public String toString() {
+ return id + ":" + name;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
new file mode 100644
index 0000000..cfd78d2
--- /dev/null
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.driver.mapping.annotations.Table;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Tests of {@link CassandraIO}. */
+public class CassandraIOTest implements Serializable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraIOTest.class);
+
+ @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ public void testEstimatedSizeBytes() throws Exception {
+ final FakeCassandraService service = new FakeCassandraService();
+ service.load();
+
+ PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
+ CassandraIO.Read spec = CassandraIO.<Scientist>read().withCassandraService(service);
+ CassandraIO.CassandraSource source = new CassandraIO.CassandraSource(
+ spec,
+ null);
+ long estimatedSizeBytes = source.getEstimatedSizeBytes(pipelineOptions);
+ // the size is the sum of the bytes size of the String representation of a scientist in the map
+ assertEquals(113890, estimatedSizeBytes);
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ FakeCassandraService service = new FakeCassandraService();
+ service.load();
+
+ PCollection<Scientist> output = pipeline.apply(CassandraIO
+ .<Scientist>read()
+ .withCassandraService(service)
+ .withKeyspace("beam")
+ .withTable("scientist")
+ .withCoder(SerializableCoder.of(Scientist.class))
+ .withEntity(Scientist.class)
+ );
+
+ PAssert.thatSingleton(output.apply("Count", Count.<Scientist>globally()))
+ .isEqualTo(10000L);
+
+ PCollection<KV<String, Integer>> mapped =
+ output.apply(
+ MapElements.via(
+ new SimpleFunction<Scientist, KV<String, Integer>>() {
+ public KV<String, Integer> apply(Scientist scientist) {
+ return KV.of(scientist.name, scientist.id);
+ }
+ }));
+ PAssert.that(mapped.apply("Count occurrences per scientist", Count.<String, Integer>perKey()))
+ .satisfies(
+ new SerializableFunction<Iterable<KV<String, Long>>, Void>() {
+ @Override
+ public Void apply(Iterable<KV<String, Long>> input) {
+ for (KV<String, Long> element : input) {
+ assertEquals(element.getKey(), 1000, element.getValue().longValue());
+ }
+ return null;
+ }
+ });
+
+ pipeline.run();
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ FakeCassandraService service = new FakeCassandraService();
+
+ ArrayList<Scientist> data = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ Scientist scientist = new Scientist();
+ scientist.id = i;
+ scientist.name = "Name " + i;
+ data.add(scientist);
+ }
+
+ pipeline
+ .apply(Create.of(data))
+ .apply(CassandraIO.<Scientist>write().withCassandraService(service)
+ .withKeyspace("beam")
+ .withEntity(Scientist.class));
+ pipeline.run();
+
+ assertEquals(service.getTable().size(), 1000);
+ for (Scientist scientist : service.getTable().values()) {
+ assertTrue(scientist.name.matches("Name (\\d*)"));
+ }
+ }
+
+ /**
+ * A {@link CassandraService} implementation that stores the entity in memory.
+ */
+ private static class FakeCassandraService implements CassandraService<Scientist> {
+
+ private static final Map<Integer, Scientist> table = new ConcurrentHashMap<>();
+
+ public void load() {
+ table.clear();
+ String[] scientists = {
+ "Lovelace",
+ "Franklin",
+ "Meitner",
+ "Hopper",
+ "Curie",
+ "Faraday",
+ "Newton",
+ "Bohr",
+ "Galilei",
+ "Maxwell"
+ };
+ for (int i = 0; i < 10000; i++) {
+ int index = i % scientists.length;
+ Scientist scientist = new Scientist();
+ scientist.id = i;
+ scientist.name = scientists[index];
+ table.put(scientist.id, scientist);
+ }
+ }
+
+ public Map<Integer, Scientist> getTable() {
+ return table;
+ }
+
+ @Override
+ public FakeCassandraReader createReader(CassandraIO.CassandraSource source) {
+ return new FakeCassandraReader(source);
+ }
+
+ static class FakeCassandraReader extends BoundedSource.BoundedReader {
+
+ private final CassandraIO.CassandraSource source;
+
+ private Iterator<Scientist> iterator;
+ private Scientist current;
+
+ public FakeCassandraReader(CassandraIO.CassandraSource source) {
+ this.source = source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ iterator = table.values().iterator();
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (iterator.hasNext()) {
+ current = iterator.next();
+ return true;
+ }
+ current = null;
+ return false;
+ }
+
+ @Override
+ public void close() {
+ iterator = null;
+ current = null;
+ }
+
+ @Override
+ public Scientist getCurrent() throws NoSuchElementException {
+ if (current == null) {
+ throw new NoSuchElementException();
+ }
+ return current;
+ }
+
+ @Override
+ public CassandraIO.CassandraSource getCurrentSource() {
+ return this.source;
+ }
+
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(CassandraIO.Read spec) {
+ long size = 0L;
+ for (Scientist scientist : table.values()) {
+ size = size + scientist.toString().getBytes().length;
+ }
+ return size;
+ }
+
+ @Override
+ public List<BoundedSource<Scientist>> split(CassandraIO.Read spec,
+ long desiredBundleSizeBytes) {
+ List<BoundedSource<Scientist>> sources = new ArrayList<>();
+ sources.add(new CassandraIO.CassandraSource<Scientist>(spec, null));
+ return sources;
+ }
+
+ static class FakeCassandraWriter implements Writer<Scientist> {
+
+ @Override
+ public void write(Scientist scientist) {
+ table.put(scientist.id, scientist);
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+
+ }
+
+ @Override
+ public FakeCassandraWriter createWriter(CassandraIO.Write<Scientist> spec) {
+ return new FakeCassandraWriter();
+ }
+
+ }
+
+ /** Simple Cassandra entity used in test. */
+ @Table(name = "scientist", keyspace = "beam")
+ public static class Scientist implements Serializable {
+
+ @Column(name = "person_name")
+ public String name;
+
+ @Column(name = "person_id")
+ public int id;
+
+ public String toString() {
+ return id + ":" + name;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImplTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImplTest.java
new file mode 100644
index 0000000..6a68e90
--- /dev/null
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImplTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Metadata;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests on {@link CassandraServiceImplTest}.
+ */
+public class CassandraServiceImplTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraServiceImplTest.class);
+
+ private static final String MURMUR3_PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner";
+
+ private Cluster createClusterMock() {
+ Metadata metadata = Mockito.mock(Metadata.class);
+ Mockito.when(metadata.getPartitioner()).thenReturn(MURMUR3_PARTITIONER);
+ Cluster cluster = Mockito.mock(Cluster.class);
+ Mockito.when(cluster.getMetadata()).thenReturn(metadata);
+ return cluster;
+ }
+
+ @Test
+ public void testValidPartitioner() throws Exception {
+ assertTrue(CassandraServiceImpl.isMurmur3Partitioner(createClusterMock()));
+ }
+
+ @Test
+ public void testDistance() throws Exception {
+ BigInteger distance = CassandraServiceImpl.distance(10L, 100L);
+ assertEquals(BigInteger.valueOf(90), distance);
+
+ distance = CassandraServiceImpl.distance(100L, 10L);
+ assertEquals(new BigInteger("18446744073709551525"), distance);
+ }
+
+ @Test
+ public void testRingFraction() throws Exception {
+ // simulate a first range taking "half" of the available tokens
+ List<CassandraServiceImpl.TokenRange> tokenRanges = new ArrayList<>();
+ tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1, Long.MIN_VALUE, 0));
+ assertEquals(0.5, CassandraServiceImpl.getRingFraction(tokenRanges), 0);
+
+ // add a second range to cover all tokens available
+ tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1, 0, Long.MAX_VALUE));
+ assertEquals(1.0, CassandraServiceImpl.getRingFraction(tokenRanges), 0);
+ }
+
+ @Test
+ public void testEstimatedSizeBytes() throws Exception {
+ List<CassandraServiceImpl.TokenRange> tokenRanges = new ArrayList<>();
+ // one partition containing all tokens, the size is actually the size of the partition
+ tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1000, Long.MIN_VALUE, Long.MAX_VALUE));
+ assertEquals(1000, CassandraServiceImpl.getEstimatedSizeBytes(tokenRanges));
+
+ // one partition with half of the tokens, we estimate the size to the double of this partition
+ tokenRanges = new ArrayList<>();
+ tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1000, Long.MIN_VALUE, 0));
+ assertEquals(2000, CassandraServiceImpl.getEstimatedSizeBytes(tokenRanges));
+
+ // we have three partitions covering all tokens, the size is the sum of partition size *
+ // partition count
+ tokenRanges = new ArrayList<>();
+ tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1000, Long.MIN_VALUE, -3));
+ tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1000, -2, 10000));
+ tokenRanges.add(new CassandraServiceImpl.TokenRange(2, 3000, 10001, Long.MAX_VALUE));
+ assertEquals(8000, CassandraServiceImpl.getEstimatedSizeBytes(tokenRanges));
+ }
+
+ @Test
+ public void testThreeSplits() throws Exception {
+ CassandraServiceImpl service = new CassandraServiceImpl();
+ CassandraIO.Read spec = CassandraIO.read().withKeyspace("beam").withTable("test");
+ List<CassandraIO.CassandraSource> sources = service.split(spec, 50, 150);
+ assertEquals(3, sources.size());
+ assertTrue(sources.get(0).splitQuery.matches("SELECT \\* FROM beam.test WHERE token\\"
+ + "(\\$pk\\)<(.*)"));
+ assertTrue(sources.get(1).splitQuery.matches("SELECT \\* FROM beam.test WHERE token\\"
+ + "(\\$pk\\)>=(.*) AND token\\(\\$pk\\)<(.*)"));
+ assertTrue(sources.get(2).splitQuery.matches("SELECT \\* FROM beam.test WHERE token\\"
+ + "(\\$pk\\)>=(.*)"));
+ }
+
+ @Test
+ public void testTwoSplits() throws Exception {
+ CassandraServiceImpl service = new CassandraServiceImpl();
+ CassandraIO.Read spec = CassandraIO.read().withKeyspace("beam").withTable("test");
+ List<CassandraIO.CassandraSource> sources = service.split(spec, 50, 100);
+ assertEquals(2, sources.size());
+ LOG.info("TOKEN: " + ((double) Long.MAX_VALUE / 2));
+ LOG.info(sources.get(0).splitQuery);
+ LOG.info(sources.get(1).splitQuery);
+ assertEquals("SELECT * FROM beam.test WHERE token($pk)<" + ((double) Long.MAX_VALUE / 2) + ";",
+ sources.get(0).splitQuery);
+ assertEquals("SELECT * FROM beam.test WHERE token($pk)>=" + ((double) Long.MAX_VALUE / 2)
+ + ";",
+ sources.get(1).splitQuery);
+ }
+
+ @Test
+ public void testUniqueSplit() throws Exception {
+ CassandraServiceImpl service = new CassandraServiceImpl();
+ CassandraIO.Read spec = CassandraIO.read().withKeyspace("beam").withTable("test");
+ List<CassandraIO.CassandraSource> sources = service.split(spec, 100, 100);
+ assertEquals(1, sources.size());
+ assertEquals("SELECT * FROM beam.test;", sources.get(0).splitQuery);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraTestDataSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraTestDataSet.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraTestDataSet.java
new file mode 100644
index 0000000..461f5ea
--- /dev/null
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraTestDataSet.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manipulates test data used by the {@link CassandraIO} tests.
+ *
+ * <p>This is independent from the tests so that for read tests it can be run separately after
+ * data store creation rather than every time (which can be more fragile).
+ */
+public class CassandraTestDataSet {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraTestDataSet.class);
+
+ /**
+ * Use this to create the read tables before IT read tests.
+ *
+ * <p>To invoke this class, you can use this command line:
+ * (run from the cassandra root directory)
+ * mvn test-compile exec:java -Dexec.mainClass=org.apache.beam.sdk.io.cassandra
+ * .CassandraTestDataSet \
+ * -Dexec.args="--cassandraHost=localhost --cassandraPort=9042 \
+ * -Dexec.classpathScope=test
+ * @param args Please pass options from IOTestPipelineOptions used for connection to Cassandra as
+ * shown above.
+ */
+ public static void main(String[] args) {
+ PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+ IOTestPipelineOptions options =
+ PipelineOptionsFactory.fromArgs(args).as(IOTestPipelineOptions.class);
+
+ createDataTable(options);
+ }
+
+ public static final String KEYSPACE = "BEAM";
+ public static final String TABLE_READ_NAME = "BEAM_READ_TEST";
+ public static final String TABLE_WRITE_NAME = "BEAM_WRITE_TEST";
+
+ public static void createDataTable(IOTestPipelineOptions options) {
+ createTable(options, TABLE_READ_NAME);
+ insertTestData(options, TABLE_READ_NAME);
+ createTable(options, TABLE_WRITE_NAME);
+ }
+
+ public static Cluster getCluster(IOTestPipelineOptions options) {
+ return Cluster.builder()
+ .addContactPoint(options.getCassandraHost())
+ .withPort(options.getCassandraPort())
+ .build();
+ }
+
+ private static void createTable(IOTestPipelineOptions options, String tableName) {
+ Cluster cluster = null;
+ Session session = null;
+ try {
+ cluster = getCluster(options);
+ session = cluster.connect();
+
+ LOG.info("Create {} keyspace if not exists", KEYSPACE);
+ session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH REPLICATION = "
+ + "{'class':'SimpleStrategy', 'replication_factor':3};");
+
+ session.execute("USE " + KEYSPACE);
+
+ LOG.info("Create {} table if not exists", tableName);
+ session.execute("CREATE TABLE IF NOT EXISTS " + tableName + "(id int, name text, PRIMARY "
+ + "KEY(id))");
+ } finally {
+ if (session != null) {
+ session.close();
+ }
+ if (cluster != null) {
+ cluster.close();
+ }
+ }
+ }
+
+ private static void insertTestData(IOTestPipelineOptions options, String tableName) {
+ Cluster cluster = null;
+ Session session = null;
+ try {
+ cluster = getCluster(options);
+ session = cluster.connect();
+
+ LOG.info("Insert test dataset");
+ String[] scientists = {
+ "Lovelace",
+ "Franklin",
+ "Meitner",
+ "Hopper",
+ "Curie",
+ "Faraday",
+ "Newton",
+ "Bohr",
+ "Galilei",
+ "Maxwell"
+ };
+ for (int i = 0; i < 1000; i++) {
+ int index = i % scientists.length;
+ session.execute("INSERT INTO " + KEYSPACE + "." + tableName + "(id, name) values("
+ + i + ",'" + scientists[index] + "');");
+ }
+ } finally {
+ if (session != null) {
+ session.close();
+ }
+ if (cluster != null) {
+ cluster.close();
+ }
+ }
+ }
+
+ public static void cleanUpDataTable(IOTestPipelineOptions options) {
+ Cluster cluster = null;
+ Session session = null;
+ try {
+ cluster = getCluster(options);
+ session = cluster.connect();
+ session.execute("TRUNCATE TABLE " + KEYSPACE + "." + TABLE_WRITE_NAME);
+ } finally {
+ if (session != null) {
+ session.close();
+ }
+ if (cluster != null) {
+ cluster.close();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index d3915c9..387fd22 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -76,4 +76,14 @@ public interface IOTestPipelineOptions extends TestPipelineOptions {
Integer getElasticsearchTcpPort();
void setElasticsearchTcpPort(Integer value);
+ @Description("Host for Cassandra server (host name/ip address)")
+ @Default.String("cassandra-host")
+ String getCassandraHost();
+ void setCassandraHost(String host);
+
+ @Description("Port for Cassandra server")
+ @Default.Integer(7001)
+ Integer getCassandraPort();
+ void setCassandraPort(Integer port);
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 9657612..94fc6a7 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -78,6 +78,7 @@
<module>mongodb</module>
<module>mqtt</module>
<module>xml</module>
+ <module>cassandra</module>
</modules>
<profiles>