You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/12/31 00:35:12 UTC
incubator-streams git commit: [STREAMS-478] streams persist
reader/writer for Apache Cassandra
Repository: incubator-streams
Updated Branches:
refs/heads/master cee5a231a -> e40e6287e
[STREAMS-478] streams persist reader/writer for Apache Cassandra
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e40e6287
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e40e6287
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e40e6287
Branch: refs/heads/master
Commit: e40e6287e5deeb51af9dcd4af9fb51e91d582373
Parents: cee5a23
Author: Subhobrata Dey <sb...@gmail.com>
Authored: Fri Dec 23 22:10:46 2016 -0500
Committer: Subhobrata Dey <sb...@gmail.com>
Committed: Wed Dec 28 19:58:33 2016 -0500
----------------------------------------------------------------------
streams-contrib/pom.xml | 1 +
.../streams-persist-cassandra/README.md | 8 +
.../streams-persist-cassandra/pom.xml | 231 +++++++++++++
.../cassandra/CassandraPersistReader.java | 316 ++++++++++++++++++
.../cassandra/CassandraPersistWriter.java | 321 +++++++++++++++++++
.../cassandra/CassandraConfiguration.json | 47 +++
.../src/main/resources/cassandra.conf | 25 ++
.../src/main/resources/components.dot | 50 +++
.../src/site/markdown/cassandra.md | 36 +++
.../src/site/markdown/index.md | 23 ++
.../streams-persist-cassandra/src/site/site.xml | 25 ++
.../cassandra/test/CassandraPersistIT.java | 106 ++++++
.../src/test/resources/CassandraPersistIT.conf | 27 ++
13 files changed, 1216 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 71b9a39..8408cef 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -38,6 +38,7 @@
<modules>
<module>streams-persist-console</module>
+ <module>streams-persist-cassandra</module>
<module>streams-persist-elasticsearch</module>
<module>streams-persist-filebuffer</module>
<module>streams-persist-hbase</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/README.md b/streams-contrib/streams-persist-cassandra/README.md
new file mode 100644
index 0000000..cd5ca39
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/README.md
@@ -0,0 +1,8 @@
+Apache Streams (incubating)
+Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
+--------------------------------------------------------------------------------
+
+org.apache.streams:streams-persist-cassandra
+===========================================
+
+[README.md](src/site/markdown/index.md "README")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/pom.xml b/streams-contrib/streams-persist-cassandra/pom.xml
new file mode 100644
index 0000000..f29b9e5
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/pom.xml
@@ -0,0 +1,231 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>streams-contrib</artifactId>
+ <groupId>org.apache.streams</groupId>
+ <version>0.5-incubating-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streams-persist-cassandra</artifactId>
+ <name>${project.artifactId}</name>
+
+ <description>Cassandra Module</description>
+
+ <properties>
+ <cassandra-driver.version>3.1.2</cassandra-driver.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ <version>${cassandra-driver.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-mapping</artifactId>
+ <version>${cassandra-driver.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-schema-activitystreams</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-testing</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.streams.plugins</groupId>
+ <artifactId>streams-plugin-pojo</artifactId>
+ <version>${project.version}</version>
+ <configuration>
+ <sourcePaths>
+ <sourcePath>${project.basedir}/src/main/jsonschema</sourcePath>
+ </sourcePaths>
+ <targetDirectory>${project.basedir}/target/generated-sources/pojo</targetDirectory>
+ <targetPackage>org.apache.streams.cassandra.pojo</targetPackage>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate-sources</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>resource-dependencies</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>unpack-dependencies</goal>
+ </goals>
+ <configuration>
+ <includeArtifactIds>streams-schema-activitystreams</includeArtifactIds>
+ <includes>**/*.json</includes>
+ <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>${failsafe.plugin.version}</version>
+ <configuration>
+ <!-- Run integration test suite rather than individual tests. -->
+ <excludes>
+ <exclude>**/*Test.java</exclude>
+ <exclude>**/*Tests.java</exclude>
+ </excludes>
+ <includes>
+ <include>**/*IT.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <includes>
+ <include>**/*.conf</include>
+ <include>**/*.json</include>
+ <include>**/*.class</include>
+ </includes>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>dockerITs</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ <property>
+ <name>skipITs</name>
+ <value>false</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <configuration combine.self="override">
+ <watchInterval>500</watchInterval>
+ <logDate>default</logDate>
+ <verbose>true</verbose>
+ <autoPull>on</autoPull>
+ <images>
+ <image>
+ <name>cassandra:3.9</name>
+ <alias>cassandra</alias>
+ <run>
+ <namingStrategy>none</namingStrategy>
+ <ports>
+ <port>${cassandra.tcp.host}:${cassandra.tcp.port}:9042</port>
+ </ports>
+ <portPropertyFile>cassandra.properties</portPropertyFile>
+ <log>
+ <enabled>true</enabled>
+ <date>default</date>
+ <color>cyan</color>
+ </log>
+ </run>
+ <watch>
+ <mode>none</mode>
+ </watch>
+ </image>
+ </images>
+ </configuration>
+
+ </plugin>
+
+ </plugins>
+ </build>
+
+ </profile>
+ </profiles>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java
new file mode 100644
index 0000000..aaa40fe
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistReader.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.schemabuilder.SchemaBuilder;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Queues;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * CassandraPersistReader reads documents from cassandra.
+ */
+public class CassandraPersistReader implements StreamsPersistReader {
+
+ public static final String STREAMS_ID = "CassandraPersistReader";
+
+ public static final Logger LOGGER = LoggerFactory.getLogger(CassandraPersistReader.class);
+
+ protected volatile Queue<StreamsDatum> persistQueue;
+
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ private ExecutorService executor;
+ private CompletableFuture<Boolean> readerTaskFuture = new CompletableFuture<>();
+
+ private CassandraConfiguration config;
+
+ protected Cluster cluster;
+ protected Session session;
+
+ protected String keyspace;
+ protected String table;
+ protected Iterator<Row> rowIterator;
+
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /**
+ * CassandraPersistReader constructor - resolves CassandraConfiguration from JVM 'cassandra'.
+ */
+ public CassandraPersistReader() {
+ this.config = new ComponentConfigurator<>(CassandraConfiguration.class)
+ .detectConfiguration(StreamsConfigurator.getConfig().getConfig("cassandra"));
+ }
+
+ /**
+ * CassandraPersistReader constructor - uses supplied CassandraConfiguration.
+ * @param config config
+ */
+ public CassandraPersistReader(CassandraConfiguration config) {
+ this.config = config;
+ }
+
+ /**
+ * CassandraPersistReader constructor - uses supplied persistQueue.
+ * @param persistQueue persistQueue
+ */
+ public CassandraPersistReader(Queue<StreamsDatum> persistQueue) {
+ this.config = new ComponentConfigurator<>(CassandraConfiguration.class)
+ .detectConfiguration(StreamsConfigurator.getConfig().getConfig("cassandra"));
+ this.persistQueue = persistQueue;
+ }
+
+ public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+ this.persistQueue = persistQueue;
+ }
+
+ public Queue<StreamsDatum> getPersistQueue() {
+ return persistQueue;
+ }
+
+ public void stop() {
+ }
+
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ connectToCassandra();
+
+ String selectStatement = getSelectStatement();
+ ResultSet rs = session.execute(selectStatement);
+ rowIterator = rs.iterator();
+
+ if (!rowIterator.hasNext()) {
+ throw new RuntimeException("Table" + table + "is empty!");
+ }
+
+ persistQueue = constructQueue();
+
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ @Override
+ public void cleanUp() {
+ stop();
+ }
+
+ protected StreamsDatum prepareDatum(Row row) {
+ ObjectNode objectNode;
+
+ try {
+ byte[] value = row.getBytes(config.getColumn()).array();
+ objectNode = mapper.readValue(value, ObjectNode.class);
+ } catch (IOException ex) {
+ LOGGER.warn("document isn't valid JSON.");
+ return null;
+ }
+
+ return new StreamsDatum(objectNode);
+ }
+
+ private synchronized void connectToCassandra() {
+ Cluster.Builder clusterBuilder = Cluster.builder()
+ .addContactPoints(config.getHost().toArray(new String[config.getHost().size()]))
+ .withPort(config.getPort().intValue());
+
+ keyspace = config.getKeyspace();
+ table = config.getTable();
+
+ if (StringUtils.isNotEmpty(config.getUser()) && StringUtils.isNotEmpty(config.getPassword())) {
+ cluster = clusterBuilder.withCredentials(config.getUser(), config.getPassword()).build();
+ } else {
+ cluster = clusterBuilder.build();
+ }
+
+ Metadata metadata = cluster.getMetadata();
+ if (Objects.isNull(metadata.getKeyspace(keyspace))) {
+ LOGGER.info("Keyspace {} does not exist. Creating Keyspace", keyspace);
+ Map<String, Object> replication = new HashMap<>();
+ replication.put("class", "SimpleStrategy");
+ replication.put("replication_factor", 1);
+
+ String createKeyspaceStmt = SchemaBuilder.createKeyspace(keyspace).with()
+ .replication(replication).getQueryString();
+ cluster.connect().execute(createKeyspaceStmt);
+ }
+
+ session = cluster.connect(keyspace);
+
+ KeyspaceMetadata ks = metadata.getKeyspace(keyspace);
+ TableMetadata tableMetadata = ks.getTable(table);
+
+ if (Objects.isNull(tableMetadata)) {
+ LOGGER.info("Table {} does not exist in Keyspace {}. Creating Table", table, keyspace);
+ String createTableStmt = SchemaBuilder.createTable(table)
+ .addPartitionKey(config.getPartitionKeyColumn(), DataType.varchar())
+ .addColumn(config.getColumn(), DataType.blob()).getQueryString();
+
+ session.execute(createTableStmt);
+ }
+ }
+
+ @Override
+ public StreamsResultSet readAll() {
+ ResultSet rs = session.execute(getSelectStatement());
+ Iterator<Row> rowsIterator = rs.iterator();
+
+ while (rowsIterator.hasNext()) {
+ Row row = rowsIterator.next();
+ StreamsDatum datum = prepareDatum(row);
+ write(datum);
+ }
+
+ return readCurrent();
+ }
+
+ @Override
+ public void startStream() {
+ LOGGER.debug("startStream");
+ CassandraPersistReaderTask readerTask = new CassandraPersistReaderTask(this);
+
+ CompletableFuture.runAsync(readerTask, executor);
+
+ try {
+ if (readerTaskFuture.get()) {
+ executor.shutdown();
+ }
+ } catch (InterruptedException ex) {
+ LOGGER.trace("Interrupt", ex);
+ } catch (ExecutionException ex) {
+ LOGGER.trace("Execution exception", ex);
+ }
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+
+ StreamsResultSet current;
+
+ try {
+ lock.writeLock().lock();
+ current = new StreamsResultSet(persistQueue);
+ current.setCounter(new DatumStatusCounter());
+ persistQueue = constructQueue();
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ return current;
+ }
+
+ protected void write(StreamsDatum entry) {
+ boolean success;
+ do {
+ try {
+ lock.readLock().lock();
+ success = persistQueue.offer(entry);
+ Thread.yield();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+ while (!success);
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return !executor.isTerminated() || !executor.isShutdown();
+ }
+
+ private Queue<StreamsDatum> constructQueue() {
+ return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+ }
+
+ private String getSelectStatement() {
+ return QueryBuilder.select().all()
+ .from(table).getQueryString();
+ }
+
+ public class CassandraPersistReaderTask implements Runnable {
+
+ private CassandraPersistReader reader;
+
+ public CassandraPersistReaderTask(CassandraPersistReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (reader.rowIterator.hasNext()) {
+ Row row = reader.rowIterator.next();
+ StreamsDatum datum = reader.prepareDatum(row);
+ reader.write(datum);
+ }
+ } finally {
+ readerTaskFuture.complete(true);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java
new file mode 100644
index 0000000..81c0e9e
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/CassandraPersistWriter.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.GuidUtils;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.schemabuilder.SchemaBuilder;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class CassandraPersistWriter implements StreamsPersistWriter, Runnable, Flushable, Closeable {
+
+ public static final String STREAMS_ID = "CassandraPersistWriter";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CassandraPersistWriter.class);
+
+ private static final long MAX_WRITE_LATENCY = 1000;
+
+ protected volatile Queue<StreamsDatum> persistQueue;
+
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
+ private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
+
+ private CassandraConfiguration config;
+
+ protected Cluster cluster;
+ protected Session session;
+
+ protected String keyspace;
+ protected String table;
+ protected PreparedStatement insertStatement;
+
+ protected List<BoundStatement> insertBatch = new ArrayList<>();
+
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ public CassandraPersistWriter() {
+ this(new ComponentConfigurator<>(CassandraConfiguration.class)
+ .detectConfiguration(StreamsConfigurator.getConfig().getConfig("cassandra")));
+ }
+
+ public CassandraPersistWriter(CassandraConfiguration config) {
+ this.config = config;
+ }
+
+ public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+ this.persistQueue = persistQueue;
+ }
+
+ public Queue<StreamsDatum> getPersistQueue() {
+ return persistQueue;
+ }
+
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
+
+ @Override
+ public void write(StreamsDatum streamsDatum) {
+
+ ObjectNode node;
+
+ if (streamsDatum.getDocument() instanceof String) {
+ try {
+ node = mapper.readValue((String) streamsDatum.getDocument(), ObjectNode.class);
+
+ byte[] value = node.toString().getBytes();
+
+ String key = GuidUtils.generateGuid(node.toString());
+ if(!Objects.isNull(streamsDatum.getMetadata().get("id"))) {
+ key = streamsDatum.getMetadata().get("id").toString();
+ }
+
+ BoundStatement statement = insertStatement.bind(key, ByteBuffer.wrap(value));
+ insertBatch.add(statement);
+ } catch (IOException ex) {
+ LOGGER.warn("Failure adding object: {}", streamsDatum.getDocument().toString());
+ return;
+ }
+ } else {
+ try {
+ node = mapper.valueToTree(streamsDatum.getDocument());
+
+ byte[] value = node.toString().getBytes();
+
+ String key = GuidUtils.generateGuid(node.toString());
+ if(!Objects.isNull(streamsDatum.getMetadata().get("id"))) {
+ key = streamsDatum.getMetadata().get("id").toString();
+ }
+
+ BoundStatement statement = insertStatement.bind(key, ByteBuffer.wrap(value));
+ insertBatch.add(statement);
+ } catch (Exception ex) {
+ LOGGER.warn("Failure adding object: {}", streamsDatum.getDocument().toString());
+ return;
+ }
+ }
+
+ flushIfNecessary();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ try {
+ LOGGER.debug("Attempting to flush {} items to cassandra", insertBatch.size());
+ lock.writeLock().lock();
+
+ BatchStatement batchStatement = new BatchStatement();
+ batchStatement.addAll(insertBatch);
+ session.execute(batchStatement);
+
+ lastWrite.set(System.currentTimeMillis());
+ insertBatch = new ArrayList<>();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ session.close();
+ cluster.close();
+ backgroundFlushTask.shutdownNow();
+ }
+
+ /**
+ * start write thread.
+ */
+ public void start() {
+ connectToCassandra();
+ backgroundFlushTask.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ flushIfNecessary();
+ }
+ }, 0, MAX_WRITE_LATENCY * 2, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * stop.
+ */
+ public void stop() {
+ try {
+ flush();
+ } catch (IOException ex) {
+ LOGGER.error("Error flushing", ex);
+ }
+
+ try {
+ close();
+ } catch (IOException ex) {
+ LOGGER.error("Error closing", ex);
+ }
+
+ try {
+ backgroundFlushTask.shutdown();
+ // Wait a while for existing tasks to terminate
+ if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
+ backgroundFlushTask.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
+ LOGGER.error("Stream did not terminate");
+ }
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ backgroundFlushTask.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ if (persistQueue.peek() != null) {
+ try {
+ StreamsDatum entry = persistQueue.remove();
+ write(entry);
+ } catch (Exception ex) {
+ LOGGER.warn("Failure writing entry from Queue: {}", ex.getMessage());
+ }
+ }
+ try {
+ Thread.sleep(new Random().nextInt(1));
+ } catch (InterruptedException interrupt) {
+ LOGGER.trace("Interrupt", interrupt);
+ }
+ }
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ this.persistQueue = new ConcurrentLinkedQueue<>();
+ start();
+ }
+
+ @Override
+ public void cleanUp() {
+ stop();
+ }
+
+ protected void flushIfNecessary() {
+ long lastLatency = System.currentTimeMillis() - lastWrite.get();
+ //Flush iff the size > 0 AND the size is divisible by 100 or the time between now and the last flush is greater
+ //than the maximum desired latency
+ if (insertBatch.size() > 0 && (insertBatch.size() % 100 == 0 || lastLatency > MAX_WRITE_LATENCY)) {
+ try {
+ flush();
+ } catch (IOException ex) {
+ LOGGER.error("Error writing to Cassandra", ex);
+ }
+ }
+ }
+
+ private synchronized void connectToCassandra() {
+ Cluster.Builder clusterBuilder = Cluster.builder()
+ .addContactPoints(config.getHost().toArray(new String[config.getHost().size()]))
+ .withPort(config.getPort().intValue());
+
+ keyspace = config.getKeyspace();
+ table = config.getTable();
+
+ if (StringUtils.isNotEmpty(config.getUser()) && StringUtils.isNotEmpty(config.getPassword())) {
+ cluster = clusterBuilder.withCredentials(config.getUser(), config.getPassword()).build();
+ } else {
+ cluster = clusterBuilder.build();
+ }
+
+ Metadata metadata = cluster.getMetadata();
+ if (Objects.isNull(metadata.getKeyspace(keyspace))) {
+ LOGGER.info("Keyspace {} does not exist. Creating Keyspace", keyspace);
+ Map<String, Object> replication = new HashMap<>();
+ replication.put("class", "SimpleStrategy");
+ replication.put("replication_factor", 1);
+
+ String createKeyspaceStmt = SchemaBuilder.createKeyspace(keyspace).with()
+ .replication(replication).getQueryString();
+ cluster.connect().execute(createKeyspaceStmt);
+ }
+
+ session = cluster.connect(keyspace);
+
+ KeyspaceMetadata ks = metadata.getKeyspace(keyspace);
+ TableMetadata tableMetadata = ks.getTable(table);
+
+ if (Objects.isNull(tableMetadata)) {
+ LOGGER.info("Table {} does not exist in Keyspace {}. Creating Table", table, keyspace);
+ String createTableStmt = SchemaBuilder.createTable(table)
+ .addPartitionKey(config.getPartitionKeyColumn(), DataType.varchar())
+ .addColumn(config.getColumn(), DataType.blob()).getQueryString();
+
+ session.execute(createTableStmt);
+ }
+
+ createInsertStatement();
+ }
+
+ private void createInsertStatement() {
+ Insert insertBuilder = QueryBuilder.insertInto(table);
+ insertBuilder.value(config.getPartitionKeyColumn(), new Object());
+ insertBuilder.value(config.getColumn(), new Object());
+ insertStatement = session.prepare(insertBuilder.getQueryString());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json b/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json
new file mode 100644
index 0000000..4b4cf1f
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/jsonschema/org/apache/streams/cassandra/CassandraConfiguration.json
@@ -0,0 +1,47 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "type": "object",
+ "javaType": "org.apache.streams.cassandra.CassandraConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "host": {
+ "type": "array",
+ "items": {
+ "type": "string"
+ },
+ "description": "Cassandra host"
+ },
+ "port": {
+ "type": "integer",
+ "description": "Cassandra port"
+ },
+ "user": {
+ "type": "string",
+ "description": "User"
+ },
+ "password": {
+ "type": "string",
+ "description": "Password"
+ },
+ "keyspace": {
+ "type": "string",
+ "description": "Keyspace"
+ },
+ "table": {
+ "type": "string",
+ "description": "Table"
+ },
+ "partitionKeyColumn": {
+ "type": "string",
+ "description": "Partition Key column name"
+ },
+ "column": {
+ "type": "string",
+ "description": "Column name"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/main/resources/cassandra.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/resources/cassandra.conf b/streams-contrib/streams-persist-cassandra/src/main/resources/cassandra.conf
new file mode 100644
index 0000000..8576e9a
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/resources/cassandra.conf
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cassandra {
+ "host": ["127.0.0.1"],
+ "port": 9042,
+ "keyspace": "test_keyspace",
+ "table": "test_table",
+ "partitionKeyColumn": "key",
+ "column": "value"
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/main/resources/components.dot
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/resources/components.dot b/streams-contrib/streams-persist-cassandra/src/main/resources/components.dot
new file mode 100644
index 0000000..916d8c0
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/main/resources/components.dot
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+digraph g {
+
+ graph [compound = true];
+
+ //presentation
+ splines = true;
+ overlap = false;
+ rankdir = TB;
+
+ generators [label="generators", shape="circle"];
+ providers [label="providers", shape="circle"];
+ processors [label="processors", shape="circle"];
+
+ subgraph cluster_persisters {
+ label="persisters";
+ persisters_cassandra_reader [label="CassandraPersistReader"]
+ persisters_cassandra_writer [label="CassandraPersistWriter"]
+ }
+
+ subgraph cluster_dbs {
+ label="dbs";
+ cassandra [label="cassandra", shape="cylinder"]
+ }
+
+ generators -> providers
+ providers -> processors
+ processors -> persisters_cassandra_writer [label="StreamsDatum"]
+ persisters_cassandra_reader -> processors [label="StreamsDatum[String]"]
+ cassandra -> persisters_cassandra_reader
+ persisters_cassandra_writer -> cassandra
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/site/markdown/cassandra.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/site/markdown/cassandra.md b/streams-contrib/streams-persist-cassandra/src/site/markdown/cassandra.md
new file mode 100644
index 0000000..9d01112
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/site/markdown/cassandra.md
@@ -0,0 +1,36 @@
+## Cassandra
+
+Start cassandra via docker with the docker maven plugin:
+
+ docker -PdockerITs docker:start
+
+Confirm that cassandra is running:
+
+ docker ps
+
+Confirm that host and post(s) are in property file:
+
+ cat cassandra.properties
+
+Create a local file `cassandra.conf` with cluster details:
+
+ cassandra {
+ host = ${cassandra.tcp.host}
+ port = ${cassandra.tcp.port}
+ }
+
+When configuring a stream, include these files:
+
+ include "cassandra.properties"
+ include "cassandra.conf"
+
+Supply application-specific configuration as well:
+
+ cassandra {
+ keyspace = test_keyspace1
+ table = test_table1
+ partitionKeyColumn = key
+ column = value
+ }
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/site/markdown/index.md b/streams-contrib/streams-persist-cassandra/src/site/markdown/index.md
new file mode 100644
index 0000000..353f1c4
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/site/markdown/index.md
@@ -0,0 +1,23 @@
+streams-persist-cassandra
+=====================
+
+Read/write to/from Cassandra
+
+## Configuration
+
+| Schema |
+|--------|
+| [CassandraConfiguration.json](../../../org/apache/streams/cassandra/CassandraConfiguration.json "CassandraConfiguration.json") [CassandraConfiguration.html](apidocs/org/apache/streams/cassandra/CassandraConfiguration.html "javadoc") |
+
+## Components
+
+![components](components.dot.svg "Components")
+
+| Class |
+|-------|
+| CassandraPersistReader [CassandraPersistReader.html](apidocs/org/apache/streams/cassandra/CassandraPersistReader.html "javadoc")
+| CassandraPersistWriter [CassandraPersistWriter.html](apidocs/org/apache/streams/cassandra/CassandraPersistWriter.html "javadoc")
+
+[JavaDocs](apidocs/index.html "JavaDocs")
+
+###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/site/site.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/site/site.xml b/streams-contrib/streams-persist-cassandra/src/site/site.xml
new file mode 100644
index 0000000..f82300a
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/site/site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project>
+ <body>
+ <links name="Help">
+ <item name="Cassandra" href="cassandra.html"/>
+ </links>
+ </body>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java
new file mode 100644
index 0000000..ca675b9
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/test/CassandraPersistIT.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.cassandra.test;
+
+import org.apache.streams.cassandra.CassandraConfiguration;
+import org.apache.streams.cassandra.CassandraPersistReader;
+import org.apache.streams.cassandra.CassandraPersistWriter;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * Test writing documents
+ */
+public class CassandraPersistIT {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CassandraPersistIT.class);
+
+ private ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ private CassandraConfiguration testConfiguration;
+
+ private int count = 0;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/CassandraPersistIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(CassandraConfiguration.class).detectConfiguration(typesafe, "cassandra");
+ }
+
+ @Test
+ public void testCassandraPersist() throws Exception {
+ CassandraPersistWriter writer = new CassandraPersistWriter(testConfiguration);
+
+ writer.prepare(null);
+
+ InputStream testActivityFolderStream = CassandraPersistIT.class.getClassLoader()
+ .getResourceAsStream("activities");
+ List<String> files = IOUtils.readLines(testActivityFolderStream, StandardCharsets.UTF_8);
+
+ for (String file: files) {
+ LOGGER.info("File: " + file );
+ InputStream testActivityFileStream = CassandraPersistIT.class.getClassLoader()
+ .getResourceAsStream("activities/" + file);
+ Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+ activity.getAdditionalProperties().remove("$license");
+ StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+ writer.write(datum);
+
+ LOGGER.info("Wrote: " + activity.getVerb() );
+ count++;
+ }
+
+ LOGGER.info("Total Written: {}", count );
+ Assert.assertEquals(89, count);
+
+ writer.cleanUp();
+
+ CassandraPersistReader reader = new CassandraPersistReader(testConfiguration);
+
+ reader.prepare(null);
+
+ StreamsResultSet resultSet = reader.readAll();
+
+ LOGGER.info("Total Read: {}", resultSet.size() );
+ Assert.assertEquals(89, resultSet.size());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e40e6287/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf b/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf
new file mode 100644
index 0000000..62fec7d
--- /dev/null
+++ b/streams-contrib/streams-persist-cassandra/src/test/resources/CassandraPersistIT.conf
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cassandra {
+ host = [${cassandra.tcp.host}]
+ port = ${cassandra.tcp.port}
+ user = cassandra
+ password = cassandra
+ keyspace = test_keyspace
+ table = test_table
+ partitionKeyColumn = key
+ column = value
+}
\ No newline at end of file