You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2017/01/27 15:48:26 UTC

incubator-streams git commit: STREAMS-482: streams-persist-riak resolves #352

Repository: incubator-streams
Updated Branches:
  refs/heads/master 1f5e22b11 -> cc46ab69a


STREAMS-482: streams-persist-riak resolves #352

Squashed commit of the following:

commit add5bfad2db50b5fd6054421348daf4e1ac120ad
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Tue Jan 24 15:41:49 2017 -0600

    imports

commit 73b4a17eab6cecd96ec9bd63e80cac7b39058c28
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Tue Jan 24 15:40:13 2017 -0600

    headers and indentation

commit 59f20bfd9f2bb88c51e86c02adec669f61fcddbb
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Tue Jan 24 15:35:25 2017 -0600

    license headers

commit 6c87bef3beb17b276d4b8455d7c4010338edd7be
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Tue Jan 24 10:50:10 2017 -0600

    basic implementation of streams-persist-riak


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cc46ab69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cc46ab69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cc46ab69

Branch: refs/heads/master
Commit: cc46ab69a0bd3c3e333d4cc82de554ba5880f766
Parents: 1f5e22b
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Fri Jan 27 09:48:04 2017 -0600
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Fri Jan 27 09:48:04 2017 -0600

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |   1 +
 streams-contrib/streams-persist-riak/pom.xml    | 239 +++++++++++++++++++
 .../streams/riak/binary/RiakBinaryClient.java   |  97 ++++++++
 .../riak/binary/RiakBinaryPersistReader.java    | 141 +++++++++++
 .../riak/binary/RiakBinaryPersistWriter.java    | 159 ++++++++++++
 .../streams/riak/http/RiakHttpClient.java       | 105 ++++++++
 .../riak/http/RiakHttpPersistReader.java        | 199 +++++++++++++++
 .../riak/http/RiakHttpPersistWriter.java        | 166 +++++++++++++
 .../src/main/jsonschema/RiakConfiguration.json  |  40 ++++
 .../streams/riak/test/RiakBinaryPersistIT.java  | 120 ++++++++++
 .../streams/riak/test/RiakHttpPersistIT.java    | 115 +++++++++
 .../src/test/resources/RiakBinaryPersistIT.conf |  21 ++
 .../src/test/resources/RiakHttpPersistIT.conf   |  21 ++
 13 files changed, 1424 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index aed60c9..1d9eee5 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -47,6 +47,7 @@
         <module>streams-persist-kafka</module>
         <module>streams-persist-mongo</module>
         <module>streams-persist-neo4j</module>
+        <module>streams-persist-riak</module>
         <module>streams-amazon-aws</module>
         <module>streams-processor-jackson</module>
         <module>streams-processor-json</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/pom.xml b/streams-contrib/streams-persist-riak/pom.xml
new file mode 100644
index 0000000..8bc6344
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/pom.xml
@@ -0,0 +1,239 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streams-contrib</artifactId>
+        <groupId>org.apache.streams</groupId>
+        <version>0.5-incubating-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streams-persist-riak</artifactId>
+    <name>${project.artifactId}</name>
+
+    <description>Riak Module</description>
+
+    <properties>
+        <httpcomponents.core.version>4.3.5</httpcomponents.core.version>
+        <httpcomponents.client.version>4.3.5</httpcomponents.client.version>
+        <riak.version>2.0.6</riak.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpcomponents.client.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-schema-activitystreams</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-testing</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>com.basho.riak</groupId>
+            <artifactId>riak-client</artifactId>
+            <version>${riak.version}</version>
+            <optional>true</optional>
+        </dependency>
+    </dependencies>
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.streams.plugins</groupId>
+                <artifactId>streams-plugin-pojo</artifactId>
+                <version>${project.version}</version>
+                <configuration>
+                    <sourcePaths>
+                        <sourcePath>${project.basedir}/src/main/jsonschema</sourcePath>
+                    </sourcePaths>
+                    <targetDirectory>${project.basedir}/target/generated-sources/pojo</targetDirectory>
+                    <targetPackage>org.apache.streams.riak.pojo</targetPackage>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate-sources</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.apache.streams</groupId>
+                        <artifactId>streams-http</artifactId>
+                        <version>${project.version}</version>
+                    </dependency>
+                </dependencies>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>dockerITs</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+                <property>
+                    <name>skipITs</name>
+                    <value>false</value>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>io.fabric8</groupId>
+                        <artifactId>docker-maven-plugin</artifactId>
+                        <configuration combine.self="override">
+                            <watchInterval>500</watchInterval>
+                            <logDate>default</logDate>
+                            <verbose>true</verbose>
+                            <autoPull>on</autoPull>
+                            <images>
+                                <image>
+                                    <name>lapax/riak</name>
+                                    <alias>riak</alias>
+                                    <run>
+                                        <env>
+                                            <NEO4J_AUTH>none</NEO4J_AUTH>
+                                        </env>
+                                        <namingStrategy>none</namingStrategy>
+                                        <ports>
+                                            <port>${riak.http.host}:${riak.http.port}:8098</port>
+                                            <port>${riak.tcp.host}:${riak.tcp.port}:8087</port>
+                                        </ports>
+                                        <portPropertyFile>riak.properties</portPropertyFile>
+                                        <wait>
+                                            <log>riak startup</log>
+                                            <http>
+                                                <url>http://${riak.http.host}:${riak.http.port}</url>
+                                                <method>GET</method>
+                                                <status>200</status>
+                                            </http>
+                                            <time>20000</time>
+                                            <kill>1000</kill>
+                                            <shutdown>500</shutdown>
+                                        </wait>
+                                        <log>
+                                            <enabled>true</enabled>
+                                            <date>default</date>
+                                            <color>cyan</color>
+                                        </log>
+                                    </run>
+                                    <watch>
+                                        <mode>none</mode>
+                                    </watch>
+                                </image>
+
+                            </images>
+                        </configuration>
+
+                    </plugin>
+
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-dependency-plugin</artifactId>
+                        <configuration>
+                            <includes>**/*.json</includes>
+                            <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+                            <includeGroupIds>org.apache.streams</includeGroupIds>
+                            <includeTypes>test-jar</includeTypes>
+                        </configuration>
+                        <executions>
+                            <execution>
+                                <id>test-resource-dependencies</id>
+                                <phase>process-test-resources</phase>
+                                <goals>
+                                    <goal>unpack-dependencies</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+
+                </plugins>
+            </build>
+
+        </profile>
+    </profiles>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryClient.java b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryClient.java
new file mode 100644
index 0000000..214eed9
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryClient.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.riak.binary;
+
+import org.apache.streams.riak.pojo.RiakConfiguration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * RiakBinaryClient maintains shared connections to riak via binary protocol.
+ */
+public class RiakBinaryClient {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(RiakBinaryClient.class);
+
+  private com.basho.riak.client.api.RiakClient client;
+
+  public RiakConfiguration config;
+
+  private RiakBinaryClient(RiakConfiguration config) {
+    this.config = config;
+    try {
+      this.start();
+    } catch (Exception e) {
+      e.printStackTrace();
+      this.client = null;
+    }
+  }
+
+  private static Map<RiakConfiguration, RiakBinaryClient> INSTANCE_MAP = new ConcurrentHashMap<>();
+
+  public static RiakBinaryClient getInstance(RiakConfiguration riakConfiguration) {
+    if ( INSTANCE_MAP != null
+        && INSTANCE_MAP.size() > 0
+        && INSTANCE_MAP.containsKey(riakConfiguration)) {
+      return INSTANCE_MAP.get(riakConfiguration);
+    } else {
+      RiakBinaryClient instance = new RiakBinaryClient(riakConfiguration);
+      if( instance != null && instance.client != null ) {
+        INSTANCE_MAP.put(riakConfiguration, instance);
+        return instance;
+      } else {
+        return null;
+      }
+    }
+  }
+
+  public void start() throws Exception {
+
+    Objects.nonNull(config);
+
+    LOGGER.info("RiakHttpClient.start {}", config);
+
+    this.client = com.basho.riak.client.api.RiakClient.newClient(config.getPort().intValue(), config.getHosts());
+
+    Objects.nonNull(client);
+
+    Objects.nonNull(client.getRiakCluster());
+
+    assert( client.getRiakCluster().getNodes().size() > 0 );
+  }
+
+  public void stop() throws Exception {
+    this.client = null;
+  }
+
+  public RiakConfiguration config() {
+    return config;
+  }
+
+  public com.basho.riak.client.api.RiakClient client() {
+    return client;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistReader.java b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistReader.java
new file mode 100644
index 0000000..3846d53
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistReader.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.riak.binary;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.riak.pojo.RiakConfiguration;
+
+import com.basho.riak.client.api.commands.kv.FetchValue;
+import com.basho.riak.client.api.commands.kv.ListKeys;
+import com.basho.riak.client.api.commands.kv.MultiFetch;
+import com.basho.riak.client.core.RiakFuture;
+import com.basho.riak.client.core.query.Location;
+import com.basho.riak.client.core.query.Namespace;
+import com.google.common.collect.Queues;
+import org.apache.commons.lang.NotImplementedException;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * RiakBinaryPersistReader reads documents from riak via binary protocol.
+ */
+public class RiakBinaryPersistReader implements StreamsPersistReader {
+
+  private RiakConfiguration configuration;
+  public RiakBinaryClient client;
+
+  public RiakBinaryPersistReader(RiakConfiguration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public String getId() {
+    return "RiakBinaryPersistReader";
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    client = RiakBinaryClient.getInstance(this.configuration);
+  }
+
+  @Override
+  public void cleanUp() {
+    client = null;
+  }
+
+  @Override
+  public synchronized StreamsResultSet readAll() {
+
+    Queue<StreamsDatum> readAllQueue = constructQueue();
+
+    Namespace ns = new Namespace(configuration.getDefaultBucketType(), configuration.getDefaultBucket());
+
+    ListKeys lk = new ListKeys.Builder(ns).build();
+
+    ListKeys.Response listKeysResponse = null;
+    try {
+      listKeysResponse = client.client().execute(lk);
+    } catch (Exception e) {
+      e.printStackTrace();
+      return null;
+    }
+
+    MultiFetch multiFetch = new MultiFetch.Builder().addLocations(listKeysResponse).build();
+    MultiFetch.Response multiFetchResponse = null;
+    try {
+      multiFetchResponse = client.client().execute(multiFetch);
+    } catch (ExecutionException e) {
+      e.printStackTrace();
+      return null;
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      return null;
+    }
+
+    for (RiakFuture<FetchValue.Response, Location> f : multiFetchResponse) {
+      try {
+        FetchValue.Response response = f.get();
+        readAllQueue.add(new StreamsDatum(response.getValue(String.class), f.getQueryInfo().getKeyAsString()));
+      }
+      catch (ExecutionException e) {
+        e.printStackTrace();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    return new StreamsResultSet(readAllQueue);
+  }
+
+  @Override
+  public void startStream() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public StreamsResultSet readCurrent() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public boolean isRunning() {
+    return Objects.nonNull(client);
+  }
+
+  private Queue<StreamsDatum> constructQueue() {
+    return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistWriter.java b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistWriter.java
new file mode 100644
index 0000000..eb7b0bf
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistWriter.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.riak.binary;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.riak.pojo.RiakConfiguration;
+
+import com.basho.riak.client.api.commands.kv.StoreValue;
+import com.basho.riak.client.core.query.Location;
+import com.basho.riak.client.core.query.Namespace;
+import com.basho.riak.client.core.query.RiakObject;
+import com.basho.riak.client.core.util.BinaryValue;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * RiakBinaryPersistWriter writes documents to riak via binary protocol.
+ */
+public class RiakBinaryPersistWriter implements StreamsPersistWriter {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(RiakBinaryPersistWriter.class);
+
+  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+  private RiakConfiguration configuration;
+  private RiakBinaryClient client;
+
+  public RiakBinaryPersistWriter(RiakConfiguration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public String getId() {
+    return "RiakBinaryPersistWriter";
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    client = RiakBinaryClient.getInstance(this.configuration);
+  }
+
+  @Override
+  public void cleanUp() {
+    client = null;
+  }
+
+  @Override
+  public void write(StreamsDatum entry) {
+
+    Objects.nonNull(client);
+
+    String id = null;
+    String document;
+    String bucket;
+    String bucketType;
+    String contentType;
+    String charset;
+    if( StringUtils.isNotBlank(entry.getId())) {
+      id = entry.getId();
+    }
+    if( entry.getDocument() instanceof String) {
+      document = (String)entry.getDocument();
+    } else {
+      try {
+        document = MAPPER.writeValueAsString(entry.getDocument());
+      } catch( Exception e ) {
+        LOGGER.warn("Exception", e);
+        return;
+      }
+    }
+    if( entry.getMetadata() != null
+        && entry.getMetadata().containsKey("bucket")
+        && entry.getMetadata().get("bucket") instanceof String
+        && StringUtils.isNotBlank((String)entry.getMetadata().get("bucket") )) {
+      bucket = (String)entry.getMetadata().get("bucket");
+    } else {
+      bucket = configuration.getDefaultBucket();
+    }
+    if( entry.getMetadata() != null
+        && entry.getMetadata().containsKey("bucketType")
+        && entry.getMetadata().get("bucketType") instanceof String
+        && StringUtils.isNotBlank((String)entry.getMetadata().get("bucketType") )) {
+      bucketType = (String)entry.getMetadata().get("bucketType");
+    } else {
+      bucketType = configuration.getDefaultBucketType();
+    }
+    if( entry.getMetadata() != null
+        && entry.getMetadata().containsKey("charset")
+        && entry.getMetadata().get("charset") instanceof String
+        && StringUtils.isNotBlank((String)entry.getMetadata().get("charset") )) {
+      charset = (String)entry.getMetadata().get("charset");
+    } else {
+      charset = configuration.getDefaultCharset();
+    }
+    if( entry.getMetadata() != null
+        && entry.getMetadata().containsKey("contentType")
+        && entry.getMetadata().get("contentType") instanceof String
+        && StringUtils.isNotBlank((String)entry.getMetadata().get("contentType") )) {
+      contentType = (String)entry.getMetadata().get("contentType");
+    } else {
+      contentType = configuration.getDefaultContentType();
+    }
+
+    try {
+
+      RiakObject riakObject = new RiakObject();
+      riakObject.setContentType(contentType);
+      riakObject.setCharset(charset);
+      riakObject.setValue(BinaryValue.create(document));
+
+      Namespace ns = new Namespace(bucketType, bucket);
+      StoreValue.Builder storeValueBuilder = new StoreValue.Builder(riakObject);
+
+      if( id != null && StringUtils.isNotBlank(id)) {
+        Location location = new Location(ns, id);
+        storeValueBuilder = storeValueBuilder.withLocation(location);
+      } else {
+        storeValueBuilder = storeValueBuilder.withNamespace(ns);
+      }
+
+      StoreValue store = storeValueBuilder.build();
+
+      StoreValue.Response storeResponse = client.client().execute(store);
+
+      LOGGER.debug("storeResponse", storeResponse);
+
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ExecutionException e) {
+      e.printStackTrace();
+    }
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpClient.java b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpClient.java
new file mode 100644
index 0000000..7155b52
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpClient.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.riak.http;
+
+import org.apache.streams.riak.pojo.RiakConfiguration;
+
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * RiakHttpClient maintains shared connections to riak via http.
+ */
+public class RiakHttpClient {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(RiakHttpClient.class);
+
+  public RiakConfiguration config;
+
+  protected CloseableHttpClient client;
+  protected URI baseURI;
+
+  private RiakHttpClient(RiakConfiguration config) {
+    this.config = config;
+    try {
+      this.start();
+    } catch (Exception e) {
+      e.printStackTrace();
+      this.client = null;
+    }
+  }
+
+  private static Map<RiakConfiguration, RiakHttpClient> INSTANCE_MAP = new ConcurrentHashMap<>();
+
+  public static RiakHttpClient getInstance(RiakConfiguration riakConfiguration) {
+    if ( INSTANCE_MAP != null
+        && INSTANCE_MAP.size() > 0
+        && INSTANCE_MAP.containsKey(riakConfiguration)) {
+      return INSTANCE_MAP.get(riakConfiguration);
+    } else {
+      RiakHttpClient instance = new RiakHttpClient(riakConfiguration);
+      if( instance != null && instance.client != null ) {
+        INSTANCE_MAP.put(riakConfiguration, instance);
+        return instance;
+      } else {
+        return null;
+      }
+    }
+  }
+
+  public void start() throws Exception {
+    Objects.nonNull(config);
+    assert(config.getScheme().startsWith("http"));
+    URIBuilder uriBuilder = new URIBuilder();
+    uriBuilder.setScheme(config.getScheme());
+    uriBuilder.setHost(config.getHosts().get(0));
+    uriBuilder.setPort(config.getPort().intValue());
+    baseURI = uriBuilder.build();
+    client = HttpClients.createDefault();
+  }
+
+  public void stop() {
+    try {
+      client.close();
+    } catch( Exception e) {
+      LOGGER.error( "Exception", e );
+    } finally {
+      client = null;
+    }
+  }
+
+  public RiakConfiguration config() {
+    return config;
+  }
+
+  public HttpClient client() {
+    return client;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistReader.java b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistReader.java
new file mode 100644
index 0000000..353bbbf
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistReader.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.riak.http;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.riak.pojo.RiakConfiguration;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.collect.Queues;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.util.EntityUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * RiakHttpPersistReader reads documents from riak via http.
+ */
+public class RiakHttpPersistReader implements StreamsPersistReader {
+
+  private RiakConfiguration configuration;
+  private RiakHttpClient client;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(RiakHttpPersistReader.class);
+
+  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+  public RiakHttpPersistReader(RiakConfiguration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public String getId() {
+    return "RiakHttpPersistReader";
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    client = RiakHttpClient.getInstance(this.configuration);
+  }
+
+  @Override
+  public void cleanUp() {
+    client = null;
+  }
+
+  @Override
+  public StreamsResultSet readAll() {
+
+    Queue<StreamsDatum> readAllQueue = constructQueue();
+
+    URIBuilder lk = null;
+
+    try {
+
+      lk = new URIBuilder(client.baseURI.toString());
+      lk.setPath(client.baseURI.getPath().concat("/buckets/"+configuration.getDefaultBucket()+"/keys"));
+      lk.setParameter("keys", "true");
+
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+
+    HttpResponse lkResponse = null;
+    try {
+      HttpGet lkGet = new HttpGet(lk.build());
+      lkResponse = client.client().execute(lkGet);
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+      return null;
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+      return null;
+    }
+
+    String lkEntityString = null;
+    try {
+      lkEntityString = EntityUtils.toString(lkResponse.getEntity());
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+      return null;
+    }
+
+    JsonNode lkEntityNode = null;
+    try {
+      lkEntityNode = MAPPER.readValue(lkEntityString, JsonNode.class);
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+      return null;
+    }
+
+    ArrayNode keysArray = null;
+    keysArray = (ArrayNode) lkEntityNode.get("keys");
+    Iterator<JsonNode> keysIterator = keysArray.iterator();
+
+    while( keysIterator.hasNext()) {
+      JsonNode keyNode = keysIterator.next();
+      String key = keyNode.asText();
+
+      URIBuilder gk = null;
+
+      try {
+
+        gk = new URIBuilder(client.baseURI.toString());
+        gk.setPath(client.baseURI.getPath().concat("/buckets/"+configuration.getDefaultBucket()+"/keys/"+key));
+
+      } catch (URISyntaxException e) {
+        LOGGER.warn("URISyntaxException", e);
+        continue;
+      }
+
+      HttpResponse gkResponse = null;
+      try {
+        HttpGet gkGet = new HttpGet(gk.build());
+        gkResponse = client.client().execute(gkGet);
+      } catch (IOException e) {
+        LOGGER.warn("IOException", e);
+        continue;
+      } catch (URISyntaxException e) {
+        LOGGER.warn("URISyntaxException", e);
+        continue;
+      }
+
+      String gkEntityString = null;
+      try {
+        gkEntityString = EntityUtils.toString(gkResponse.getEntity());
+      } catch (IOException e) {
+        LOGGER.warn("IOException", e);
+        continue;
+      }
+
+      readAllQueue.add(new StreamsDatum(gkEntityString, key));
+    }
+
+    return new StreamsResultSet(readAllQueue);
+  }
+
+  @Override
+  public void startStream() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public StreamsResultSet readCurrent() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public boolean isRunning() {
+    return false;
+  }
+
+  private Queue<StreamsDatum> constructQueue() {
+    return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistWriter.java b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistWriter.java
new file mode 100644
index 0000000..b51b053
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistWriter.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.riak.http;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.riak.pojo.RiakConfiguration;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.StringEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+/**
+ * RiakHttpPersistWriter writes documents to riak via http.
+ */
+public class RiakHttpPersistWriter implements StreamsPersistWriter {
+
+  private RiakConfiguration configuration;
+  private RiakHttpClient client;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(RiakHttpPersistWriter.class);
+
+  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+  public RiakHttpPersistWriter(RiakConfiguration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public String getId() {
+    return "RiakHttpPersistWriter";
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    client = RiakHttpClient.getInstance(this.configuration);
+  }
+
+  @Override
+  public void cleanUp() {
+    client = null;
+  }
+
+  @Override
+  public void write(StreamsDatum entry) {
+
+    Objects.nonNull(client);
+
+    String id = null;
+    String document;
+    String bucket;
+    String bucketType;
+    String contentType;
+    String charset;
+    if( StringUtils.isNotBlank(entry.getId())) {
+      id = entry.getId();
+    }
+    if( entry.getDocument() instanceof String) {
+      document = (String)entry.getDocument();
+    } else {
+      try {
+        document = MAPPER.writeValueAsString(entry.getDocument());
+      } catch( Exception e ) {
+        LOGGER.warn("Exception", e);
+        return;
+      }
+    }
+    if( entry.getMetadata() != null
+        && entry.getMetadata().containsKey("bucket")
+        && entry.getMetadata().get("bucket") instanceof String
+        && StringUtils.isNotBlank((String)entry.getMetadata().get("bucket") )) {
+      bucket = (String)entry.getMetadata().get("bucket");
+    } else {
+      bucket = configuration.getDefaultBucket();
+    }
+    if( entry.getMetadata() != null
+        && entry.getMetadata().containsKey("bucketType")
+        && entry.getMetadata().get("bucketType") instanceof String
+        && StringUtils.isNotBlank((String)entry.getMetadata().get("bucketType") )) {
+      bucketType = (String)entry.getMetadata().get("bucketType");
+    } else {
+      bucketType = configuration.getDefaultBucketType();
+    }
+    if( entry.getMetadata() != null
+        && entry.getMetadata().containsKey("charset")
+        && entry.getMetadata().get("charset") instanceof String
+        && StringUtils.isNotBlank((String)entry.getMetadata().get("charset") )) {
+      charset = (String)entry.getMetadata().get("charset");
+    } else {
+      charset = configuration.getDefaultCharset();
+    }
+    if( entry.getMetadata() != null
+        && entry.getMetadata().containsKey("contentType")
+        && entry.getMetadata().get("contentType") instanceof String
+        && StringUtils.isNotBlank((String)entry.getMetadata().get("contentType") )) {
+      contentType = (String)entry.getMetadata().get("contentType");
+    } else {
+      contentType = configuration.getDefaultContentType();
+    }
+
+    URIBuilder uriBuilder = new URIBuilder(client.baseURI);
+    if( bucket != null && StringUtils.isNotBlank(bucket)) {
+      uriBuilder.setPath("/riak/"+bucket);
+    }
+    if( id != null && StringUtils.isNotBlank(id)) {
+      uriBuilder.setPath("/riak/"+bucket+"/"+id);
+    }
+
+    URI uri;
+    try {
+      uri = uriBuilder.build();
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+      return;
+    }
+
+    HttpPost post = new HttpPost();
+    post.setHeader("Content-Type", contentType + "; charset=" + charset);
+    post.setURI(uri);
+    HttpEntity entity;
+    try {
+      entity = new StringEntity(document);
+      post.setEntity(entity);
+    } catch (UnsupportedEncodingException e) {
+      LOGGER.warn("UnsupportedEncodingException", e);
+      return;
+    }
+
+    try {
+      HttpResponse response = client.client().execute(post);
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+      return;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/main/jsonschema/RiakConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/main/jsonschema/RiakConfiguration.json b/streams-contrib/streams-persist-riak/src/main/jsonschema/RiakConfiguration.json
new file mode 100644
index 0000000..917347f
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/main/jsonschema/RiakConfiguration.json
@@ -0,0 +1,40 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "type": "object",
+  "javaType" : "org.apache.streams.riak.pojo.RiakConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "scheme": {
+      "type": "string"
+    },
+    "hosts": {
+      "type": "array",
+      "items": {
+        "type": "string"
+      }
+    },
+    "port": {
+      "type": "integer"
+    },
+    "defaultBucket": {
+      "type": "string",
+      "default": "streams"
+    },
+    "defaultBucketType": {
+      "type": "string",
+      "default": "default"
+    },
+    "defaultCharset": {
+      "type": "string",
+      "default": "UTF-8"
+    },
+    "defaultContentType": {
+      "type": "string",
+      "default": "application/json"
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/test/java/org/apache/streams/riak/test/RiakBinaryPersistIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/test/java/org/apache/streams/riak/test/RiakBinaryPersistIT.java b/streams-contrib/streams-persist-riak/src/test/java/org/apache/streams/riak/test/RiakBinaryPersistIT.java
new file mode 100644
index 0000000..523ec01
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/test/java/org/apache/streams/riak/test/RiakBinaryPersistIT.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.riak.test;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.riak.binary.RiakBinaryClient;
+import org.apache.streams.riak.binary.RiakBinaryPersistReader;
+import org.apache.streams.riak.binary.RiakBinaryPersistWriter;
+import org.apache.streams.riak.pojo.RiakConfiguration;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Integration test for RiakBinaryPersist.
+ */
+public class RiakBinaryPersistIT {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(RiakBinaryPersistIT.class);
+
+  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+  private RiakBinaryClient testClient;
+
+  private RiakConfiguration testConfiguration;
+
+  @BeforeClass
+  public void prepareTest() throws IOException {
+
+    Config reference  = ConfigFactory.load();
+    File conf = new File("target/test-classes/RiakBinaryPersistIT.conf");
+    assertTrue(conf.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(RiakConfiguration.class).detectConfiguration(typesafe, "riak");
+    testClient = RiakBinaryClient.getInstance(testConfiguration);
+
+  }
+
+  @Test
+  public void testRiakBinaryPersist() throws Exception {
+
+    RiakBinaryPersistWriter testPersistWriter = new RiakBinaryPersistWriter(testConfiguration);
+    testPersistWriter.prepare(testConfiguration);
+
+    InputStream testActivityFolderStream = RiakBinaryPersistIT.class.getClassLoader()
+        .getResourceAsStream("activities");
+    List<String> files = IOUtils.readLines(testActivityFolderStream, StandardCharsets.UTF_8);
+
+    int count = 0;
+    for( String file : files) {
+      LOGGER.info("File: " + file );
+      InputStream testActivityFileStream = RiakBinaryPersistIT.class.getClassLoader()
+          .getResourceAsStream("activities/" + file);
+      Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+
+      StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+      testPersistWriter.write( datum );
+      LOGGER.info("Wrote: " + activity.getVerb() );
+      count++;
+    }
+
+    testPersistWriter.cleanUp();
+
+    LOGGER.info("Total Written: {}", count );
+    Assert.assertEquals(count, 89);
+
+    RiakBinaryPersistReader testPersistReader = new RiakBinaryPersistReader(testConfiguration);
+    testPersistReader.prepare(testConfiguration);
+
+    StreamsResultSet readerResultSet = testPersistReader.readAll();
+    LOGGER.info("Total Read: {}", readerResultSet.size() );
+    Assert.assertEquals(readerResultSet.size(), 89);
+
+  }
+
+  @AfterClass
+  public void cleanup() throws Exception {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/test/java/org/apache/streams/riak/test/RiakHttpPersistIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/test/java/org/apache/streams/riak/test/RiakHttpPersistIT.java b/streams-contrib/streams-persist-riak/src/test/java/org/apache/streams/riak/test/RiakHttpPersistIT.java
new file mode 100644
index 0000000..2eeea30
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/test/java/org/apache/streams/riak/test/RiakHttpPersistIT.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.riak.test;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.riak.http.RiakHttpClient;
+import org.apache.streams.riak.http.RiakHttpPersistReader;
+import org.apache.streams.riak.http.RiakHttpPersistWriter;
+import org.apache.streams.riak.pojo.RiakConfiguration;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Integration test for RiakHttpPersist.
+ */
+public class RiakHttpPersistIT {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(RiakHttpPersistIT.class);
+
+  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+  private RiakHttpClient testClient;
+
+  private RiakConfiguration testConfiguration;
+
+  @BeforeClass
+  public void prepareTest() throws IOException {
+
+    Config reference  = ConfigFactory.load();
+    File conf = new File("target/test-classes/RiakHttpPersistIT.conf");
+    assertTrue(conf.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(RiakConfiguration.class).detectConfiguration(typesafe, "riak");
+    testClient = RiakHttpClient.getInstance(testConfiguration);
+
+  }
+
+  @Test
+  public void testRiakHttpPersist() throws Exception {
+
+    RiakHttpPersistWriter testPersistWriter = new RiakHttpPersistWriter(testConfiguration);
+    testPersistWriter.prepare(testConfiguration);
+
+    InputStream testActivityFolderStream = RiakHttpPersistIT.class.getClassLoader()
+        .getResourceAsStream("activities");
+    List<String> files = IOUtils.readLines(testActivityFolderStream, StandardCharsets.UTF_8);
+
+    // write data
+
+    int count = 0;
+    for( String file : files) {
+      LOGGER.info("File: " + file );
+      InputStream testActivityFileStream = RiakHttpPersistIT.class.getClassLoader()
+          .getResourceAsStream("activities/" + file);
+      Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+      StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+      testPersistWriter.write( datum );
+      LOGGER.info("Wrote: " + activity.getVerb() );
+      count++;
+    }
+
+    testPersistWriter.cleanUp();
+
+    LOGGER.info("Total Written: {}", count );
+    Assert.assertEquals(count, 89);
+
+    RiakHttpPersistReader testPersistReader = new RiakHttpPersistReader(testConfiguration);
+    testPersistReader.prepare(testConfiguration);
+
+    StreamsResultSet readerResultSet = testPersistReader.readAll();
+    LOGGER.info("Total Read: {}", readerResultSet.size() );
+    Assert.assertEquals(readerResultSet.size(), 89);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/test/resources/RiakBinaryPersistIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/test/resources/RiakBinaryPersistIT.conf b/streams-contrib/streams-persist-riak/src/test/resources/RiakBinaryPersistIT.conf
new file mode 100644
index 0000000..2957ab6
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/test/resources/RiakBinaryPersistIT.conf
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+riak {
+  scheme = "tcp"
+  hosts += ${riak.tcp.host}
+  port = ${riak.tcp.port}
+  defaultBucket = "binary"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/test/resources/RiakHttpPersistIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/test/resources/RiakHttpPersistIT.conf b/streams-contrib/streams-persist-riak/src/test/resources/RiakHttpPersistIT.conf
new file mode 100644
index 0000000..83f097f
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/test/resources/RiakHttpPersistIT.conf
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+riak {
+  scheme = "http"
+  hosts += ${riak.http.host}
+  port = ${riak.http.port}
+  defaultBucket = "http"
+}
\ No newline at end of file