You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2017/01/27 15:48:26 UTC
incubator-streams git commit: STREAMS-482: streams-persist-riak
resolves #352
Repository: incubator-streams
Updated Branches:
refs/heads/master 1f5e22b11 -> cc46ab69a
STREAMS-482: streams-persist-riak resolves #352
Squashed commit of the following:
commit add5bfad2db50b5fd6054421348daf4e1ac120ad
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date: Tue Jan 24 15:41:49 2017 -0600
imports
commit 73b4a17eab6cecd96ec9bd63e80cac7b39058c28
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date: Tue Jan 24 15:40:13 2017 -0600
headers and indentation
commit 59f20bfd9f2bb88c51e86c02adec669f61fcddbb
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date: Tue Jan 24 15:35:25 2017 -0600
license headers
commit 6c87bef3beb17b276d4b8455d7c4010338edd7be
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date: Tue Jan 24 10:50:10 2017 -0600
basic implementation of streams-persist-riak
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cc46ab69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cc46ab69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cc46ab69
Branch: refs/heads/master
Commit: cc46ab69a0bd3c3e333d4cc82de554ba5880f766
Parents: 1f5e22b
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Fri Jan 27 09:48:04 2017 -0600
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Fri Jan 27 09:48:04 2017 -0600
----------------------------------------------------------------------
streams-contrib/pom.xml | 1 +
streams-contrib/streams-persist-riak/pom.xml | 239 +++++++++++++++++++
.../streams/riak/binary/RiakBinaryClient.java | 97 ++++++++
.../riak/binary/RiakBinaryPersistReader.java | 141 +++++++++++
.../riak/binary/RiakBinaryPersistWriter.java | 159 ++++++++++++
.../streams/riak/http/RiakHttpClient.java | 105 ++++++++
.../riak/http/RiakHttpPersistReader.java | 199 +++++++++++++++
.../riak/http/RiakHttpPersistWriter.java | 166 +++++++++++++
.../src/main/jsonschema/RiakConfiguration.json | 40 ++++
.../streams/riak/test/RiakBinaryPersistIT.java | 120 ++++++++++
.../streams/riak/test/RiakHttpPersistIT.java | 115 +++++++++
.../src/test/resources/RiakBinaryPersistIT.conf | 21 ++
.../src/test/resources/RiakHttpPersistIT.conf | 21 ++
13 files changed, 1424 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index aed60c9..1d9eee5 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -47,6 +47,7 @@
<module>streams-persist-kafka</module>
<module>streams-persist-mongo</module>
<module>streams-persist-neo4j</module>
+ <module>streams-persist-riak</module>
<module>streams-amazon-aws</module>
<module>streams-processor-jackson</module>
<module>streams-processor-json</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/pom.xml b/streams-contrib/streams-persist-riak/pom.xml
new file mode 100644
index 0000000..8bc6344
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/pom.xml
@@ -0,0 +1,239 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>streams-contrib</artifactId>
+ <groupId>org.apache.streams</groupId>
+ <version>0.5-incubating-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streams-persist-riak</artifactId>
+ <name>${project.artifactId}</name>
+
+ <description>Riak Module</description>
+
+ <properties>
+ <httpcomponents.core.version>4.3.5</httpcomponents.core.version>
+ <httpcomponents.client.version>4.3.5</httpcomponents.client.version>
+ <riak.version>2.0.6</riak.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpcomponents.client.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-schema-activitystreams</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-testing</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>com.basho.riak</groupId>
+ <artifactId>riak-client</artifactId>
+ <version>${riak.version}</version>
+ <optional>true</optional>
+ </dependency>
+ </dependencies>
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.streams.plugins</groupId>
+ <artifactId>streams-plugin-pojo</artifactId>
+ <version>${project.version}</version>
+ <configuration>
+ <sourcePaths>
+ <sourcePath>${project.basedir}/src/main/jsonschema</sourcePath>
+ </sourcePaths>
+ <targetDirectory>${project.basedir}/target/generated-sources/pojo</targetDirectory>
+ <targetPackage>org.apache.streams.riak.pojo</targetPackage>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate-sources</goal>
+ </goals>
+ </execution>
+ </executions>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-http</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>dockerITs</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ <property>
+ <name>skipITs</name>
+ <value>false</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <configuration combine.self="override">
+ <watchInterval>500</watchInterval>
+ <logDate>default</logDate>
+ <verbose>true</verbose>
+ <autoPull>on</autoPull>
+ <images>
+ <image>
+ <name>lapax/riak</name>
+ <alias>riak</alias>
+ <run>
+ <env>
+ <NEO4J_AUTH>none</NEO4J_AUTH>
+ </env>
+ <namingStrategy>none</namingStrategy>
+ <ports>
+ <port>${riak.http.host}:${riak.http.port}:8098</port>
+ <port>${riak.tcp.host}:${riak.tcp.port}:8087</port>
+ </ports>
+ <portPropertyFile>riak.properties</portPropertyFile>
+ <wait>
+ <log>riak startup</log>
+ <http>
+ <url>http://${riak.http.host}:${riak.http.port}</url>
+ <method>GET</method>
+ <status>200</status>
+ </http>
+ <time>20000</time>
+ <kill>1000</kill>
+ <shutdown>500</shutdown>
+ </wait>
+ <log>
+ <enabled>true</enabled>
+ <date>default</date>
+ <color>cyan</color>
+ </log>
+ </run>
+ <watch>
+ <mode>none</mode>
+ </watch>
+ </image>
+
+ </images>
+ </configuration>
+
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <includes>**/*.json</includes>
+ <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+ <includeGroupIds>org.apache.streams</includeGroupIds>
+ <includeTypes>test-jar</includeTypes>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test-resource-dependencies</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>unpack-dependencies</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+ </profile>
+ </profiles>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryClient.java b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryClient.java
new file mode 100644
index 0000000..214eed9
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryClient.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.riak.binary;
+
+import org.apache.streams.riak.pojo.RiakConfiguration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * RiakBinaryClient maintains shared connections to riak via binary protocol.
+ */
+public class RiakBinaryClient {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(RiakBinaryClient.class);
+
+ private com.basho.riak.client.api.RiakClient client;
+
+ public RiakConfiguration config;
+
+ private RiakBinaryClient(RiakConfiguration config) {
+ this.config = config;
+ try {
+ this.start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ this.client = null;
+ }
+ }
+
+ private static Map<RiakConfiguration, RiakBinaryClient> INSTANCE_MAP = new ConcurrentHashMap<>();
+
+ public static RiakBinaryClient getInstance(RiakConfiguration riakConfiguration) {
+ if ( INSTANCE_MAP != null
+ && INSTANCE_MAP.size() > 0
+ && INSTANCE_MAP.containsKey(riakConfiguration)) {
+ return INSTANCE_MAP.get(riakConfiguration);
+ } else {
+ RiakBinaryClient instance = new RiakBinaryClient(riakConfiguration);
+ if( instance != null && instance.client != null ) {
+ INSTANCE_MAP.put(riakConfiguration, instance);
+ return instance;
+ } else {
+ return null;
+ }
+ }
+ }
+
+ public void start() throws Exception {
+
+ Objects.nonNull(config);
+
+ LOGGER.info("RiakHttpClient.start {}", config);
+
+ this.client = com.basho.riak.client.api.RiakClient.newClient(config.getPort().intValue(), config.getHosts());
+
+ Objects.nonNull(client);
+
+ Objects.nonNull(client.getRiakCluster());
+
+ assert( client.getRiakCluster().getNodes().size() > 0 );
+ }
+
+ public void stop() throws Exception {
+ this.client = null;
+ }
+
+ public RiakConfiguration config() {
+ return config;
+ }
+
+ public com.basho.riak.client.api.RiakClient client() {
+ return client;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistReader.java b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistReader.java
new file mode 100644
index 0000000..3846d53
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistReader.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.riak.binary;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.riak.pojo.RiakConfiguration;
+
+import com.basho.riak.client.api.commands.kv.FetchValue;
+import com.basho.riak.client.api.commands.kv.ListKeys;
+import com.basho.riak.client.api.commands.kv.MultiFetch;
+import com.basho.riak.client.core.RiakFuture;
+import com.basho.riak.client.core.query.Location;
+import com.basho.riak.client.core.query.Namespace;
+import com.google.common.collect.Queues;
+import org.apache.commons.lang.NotImplementedException;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * RiakBinaryPersistReader reads documents from riak via binary protocol.
+ */
+public class RiakBinaryPersistReader implements StreamsPersistReader {
+
+ private RiakConfiguration configuration;
+ public RiakBinaryClient client;
+
+ public RiakBinaryPersistReader(RiakConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public String getId() {
+ return "RiakBinaryPersistReader";
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ client = RiakBinaryClient.getInstance(this.configuration);
+ }
+
+ @Override
+ public void cleanUp() {
+ client = null;
+ }
+
+ @Override
+ public synchronized StreamsResultSet readAll() {
+
+ Queue<StreamsDatum> readAllQueue = constructQueue();
+
+ Namespace ns = new Namespace(configuration.getDefaultBucketType(), configuration.getDefaultBucket());
+
+ ListKeys lk = new ListKeys.Builder(ns).build();
+
+ ListKeys.Response listKeysResponse = null;
+ try {
+ listKeysResponse = client.client().execute(lk);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ }
+
+ MultiFetch multiFetch = new MultiFetch.Builder().addLocations(listKeysResponse).build();
+ MultiFetch.Response multiFetchResponse = null;
+ try {
+ multiFetchResponse = client.client().execute(multiFetch);
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ return null;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return null;
+ }
+
+ for (RiakFuture<FetchValue.Response, Location> f : multiFetchResponse) {
+ try {
+ FetchValue.Response response = f.get();
+ readAllQueue.add(new StreamsDatum(response.getValue(String.class), f.getQueryInfo().getKeyAsString()));
+ }
+ catch (ExecutionException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ return new StreamsResultSet(readAllQueue);
+ }
+
+ @Override
+ public void startStream() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean isRunning() {
+ return Objects.nonNull(client);
+ }
+
+ private Queue<StreamsDatum> constructQueue() {
+ return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistWriter.java b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistWriter.java
new file mode 100644
index 0000000..eb7b0bf
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistWriter.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.riak.binary;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.riak.pojo.RiakConfiguration;
+
+import com.basho.riak.client.api.commands.kv.StoreValue;
+import com.basho.riak.client.core.query.Location;
+import com.basho.riak.client.core.query.Namespace;
+import com.basho.riak.client.core.query.RiakObject;
+import com.basho.riak.client.core.util.BinaryValue;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * RiakBinaryPersistWriter writes documents to riak via binary protocol.
+ */
+public class RiakBinaryPersistWriter implements StreamsPersistWriter {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RiakBinaryPersistWriter.class);
+
+ private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ private RiakConfiguration configuration;
+ private RiakBinaryClient client;
+
+ public RiakBinaryPersistWriter(RiakConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public String getId() {
+ return "RiakBinaryPersistWriter";
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ client = RiakBinaryClient.getInstance(this.configuration);
+ }
+
+ @Override
+ public void cleanUp() {
+ client = null;
+ }
+
+ @Override
+ public void write(StreamsDatum entry) {
+
+ Objects.nonNull(client);
+
+ String id = null;
+ String document;
+ String bucket;
+ String bucketType;
+ String contentType;
+ String charset;
+ if( StringUtils.isNotBlank(entry.getId())) {
+ id = entry.getId();
+ }
+ if( entry.getDocument() instanceof String) {
+ document = (String)entry.getDocument();
+ } else {
+ try {
+ document = MAPPER.writeValueAsString(entry.getDocument());
+ } catch( Exception e ) {
+ LOGGER.warn("Exception", e);
+ return;
+ }
+ }
+ if( entry.getMetadata() != null
+ && entry.getMetadata().containsKey("bucket")
+ && entry.getMetadata().get("bucket") instanceof String
+ && StringUtils.isNotBlank((String)entry.getMetadata().get("bucket") )) {
+ bucket = (String)entry.getMetadata().get("bucket");
+ } else {
+ bucket = configuration.getDefaultBucket();
+ }
+ if( entry.getMetadata() != null
+ && entry.getMetadata().containsKey("bucketType")
+ && entry.getMetadata().get("bucketType") instanceof String
+ && StringUtils.isNotBlank((String)entry.getMetadata().get("bucketType") )) {
+ bucketType = (String)entry.getMetadata().get("bucketType");
+ } else {
+ bucketType = configuration.getDefaultBucketType();
+ }
+ if( entry.getMetadata() != null
+ && entry.getMetadata().containsKey("charset")
+ && entry.getMetadata().get("charset") instanceof String
+ && StringUtils.isNotBlank((String)entry.getMetadata().get("charset") )) {
+ charset = (String)entry.getMetadata().get("charset");
+ } else {
+ charset = configuration.getDefaultCharset();
+ }
+ if( entry.getMetadata() != null
+ && entry.getMetadata().containsKey("contentType")
+ && entry.getMetadata().get("contentType") instanceof String
+ && StringUtils.isNotBlank((String)entry.getMetadata().get("contentType") )) {
+ contentType = (String)entry.getMetadata().get("contentType");
+ } else {
+ contentType = configuration.getDefaultContentType();
+ }
+
+ try {
+
+ RiakObject riakObject = new RiakObject();
+ riakObject.setContentType(contentType);
+ riakObject.setCharset(charset);
+ riakObject.setValue(BinaryValue.create(document));
+
+ Namespace ns = new Namespace(bucketType, bucket);
+ StoreValue.Builder storeValueBuilder = new StoreValue.Builder(riakObject);
+
+ if( id != null && StringUtils.isNotBlank(id)) {
+ Location location = new Location(ns, id);
+ storeValueBuilder = storeValueBuilder.withLocation(location);
+ } else {
+ storeValueBuilder = storeValueBuilder.withNamespace(ns);
+ }
+
+ StoreValue store = storeValueBuilder.build();
+
+ StoreValue.Response storeResponse = client.client().execute(store);
+
+ LOGGER.debug("storeResponse", storeResponse);
+
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpClient.java b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpClient.java
new file mode 100644
index 0000000..7155b52
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpClient.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.riak.http;
+
+import org.apache.streams.riak.pojo.RiakConfiguration;
+
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * RiakHttpClient maintains shared connections to riak via http.
+ */
+public class RiakHttpClient {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(RiakHttpClient.class);
+
+ public RiakConfiguration config;
+
+ protected CloseableHttpClient client;
+ protected URI baseURI;
+
+ private RiakHttpClient(RiakConfiguration config) {
+ this.config = config;
+ try {
+ this.start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ this.client = null;
+ }
+ }
+
+ private static Map<RiakConfiguration, RiakHttpClient> INSTANCE_MAP = new ConcurrentHashMap<>();
+
+ public static RiakHttpClient getInstance(RiakConfiguration riakConfiguration) {
+ if ( INSTANCE_MAP != null
+ && INSTANCE_MAP.size() > 0
+ && INSTANCE_MAP.containsKey(riakConfiguration)) {
+ return INSTANCE_MAP.get(riakConfiguration);
+ } else {
+ RiakHttpClient instance = new RiakHttpClient(riakConfiguration);
+ if( instance != null && instance.client != null ) {
+ INSTANCE_MAP.put(riakConfiguration, instance);
+ return instance;
+ } else {
+ return null;
+ }
+ }
+ }
+
+ public void start() throws Exception {
+ Objects.nonNull(config);
+ assert(config.getScheme().startsWith("http"));
+ URIBuilder uriBuilder = new URIBuilder();
+ uriBuilder.setScheme(config.getScheme());
+ uriBuilder.setHost(config.getHosts().get(0));
+ uriBuilder.setPort(config.getPort().intValue());
+ baseURI = uriBuilder.build();
+ client = HttpClients.createDefault();
+ }
+
+ public void stop() {
+ try {
+ client.close();
+ } catch( Exception e) {
+ LOGGER.error( "Exception", e );
+ } finally {
+ client = null;
+ }
+ }
+
+ public RiakConfiguration config() {
+ return config;
+ }
+
+ public HttpClient client() {
+ return client;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistReader.java b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistReader.java
new file mode 100644
index 0000000..353bbbf
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistReader.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.riak.http;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.riak.pojo.RiakConfiguration;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.collect.Queues;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.util.EntityUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * RiakHttpPersistReader reads documents from riak via http.
+ */
+public class RiakHttpPersistReader implements StreamsPersistReader {
+
+ private RiakConfiguration configuration;
+ private RiakHttpClient client;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RiakHttpPersistReader.class);
+
+ private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ public RiakHttpPersistReader(RiakConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public String getId() {
+ return "RiakHttpPersistReader";
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ client = RiakHttpClient.getInstance(this.configuration);
+ }
+
+ @Override
+ public void cleanUp() {
+ client = null;
+ }
+
+ @Override
+ public StreamsResultSet readAll() {
+
+ Queue<StreamsDatum> readAllQueue = constructQueue();
+
+ URIBuilder lk = null;
+
+ try {
+
+ lk = new URIBuilder(client.baseURI.toString());
+ lk.setPath(client.baseURI.getPath().concat("/buckets/"+configuration.getDefaultBucket()+"/keys"));
+ lk.setParameter("keys", "true");
+
+ } catch (URISyntaxException e) {
+ LOGGER.warn("URISyntaxException", e);
+ }
+
+ HttpResponse lkResponse = null;
+ try {
+ HttpGet lkGet = new HttpGet(lk.build());
+ lkResponse = client.client().execute(lkGet);
+ } catch (IOException e) {
+ LOGGER.warn("IOException", e);
+ return null;
+ } catch (URISyntaxException e) {
+ LOGGER.warn("URISyntaxException", e);
+ return null;
+ }
+
+ String lkEntityString = null;
+ try {
+ lkEntityString = EntityUtils.toString(lkResponse.getEntity());
+ } catch (IOException e) {
+ LOGGER.warn("IOException", e);
+ return null;
+ }
+
+ JsonNode lkEntityNode = null;
+ try {
+ lkEntityNode = MAPPER.readValue(lkEntityString, JsonNode.class);
+ } catch (IOException e) {
+ LOGGER.warn("IOException", e);
+ return null;
+ }
+
+ ArrayNode keysArray = null;
+ keysArray = (ArrayNode) lkEntityNode.get("keys");
+ Iterator<JsonNode> keysIterator = keysArray.iterator();
+
+ while( keysIterator.hasNext()) {
+ JsonNode keyNode = keysIterator.next();
+ String key = keyNode.asText();
+
+ URIBuilder gk = null;
+
+ try {
+
+ gk = new URIBuilder(client.baseURI.toString());
+ gk.setPath(client.baseURI.getPath().concat("/buckets/"+configuration.getDefaultBucket()+"/keys/"+key));
+
+ } catch (URISyntaxException e) {
+ LOGGER.warn("URISyntaxException", e);
+ continue;
+ }
+
+ HttpResponse gkResponse = null;
+ try {
+ HttpGet gkGet = new HttpGet(gk.build());
+ gkResponse = client.client().execute(gkGet);
+ } catch (IOException e) {
+ LOGGER.warn("IOException", e);
+ continue;
+ } catch (URISyntaxException e) {
+ LOGGER.warn("URISyntaxException", e);
+ continue;
+ }
+
+ String gkEntityString = null;
+ try {
+ gkEntityString = EntityUtils.toString(gkResponse.getEntity());
+ } catch (IOException e) {
+ LOGGER.warn("IOException", e);
+ continue;
+ }
+
+ readAllQueue.add(new StreamsDatum(gkEntityString, key));
+ }
+
+ return new StreamsResultSet(readAllQueue);
+ }
+
+ @Override
+ public void startStream() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean isRunning() {
+ return false;
+ }
+
+ private Queue<StreamsDatum> constructQueue() {
+ return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistWriter.java b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistWriter.java
new file mode 100644
index 0000000..b51b053
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistWriter.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.riak.http;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.riak.pojo.RiakConfiguration;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.StringEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+/**
+ * RiakHttpPersistWriter writes documents to riak via http.
+ */
+public class RiakHttpPersistWriter implements StreamsPersistWriter {
+
+ private RiakConfiguration configuration;
+ private RiakHttpClient client;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RiakHttpPersistWriter.class);
+
+ private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ public RiakHttpPersistWriter(RiakConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public String getId() {
+ return "RiakHttpPersistWriter";
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ client = RiakHttpClient.getInstance(this.configuration);
+ }
+
+ @Override
+ public void cleanUp() {
+ client = null;
+ }
+
+ @Override
+ public void write(StreamsDatum entry) {
+
+ Objects.nonNull(client);
+
+ String id = null;
+ String document;
+ String bucket;
+ String bucketType;
+ String contentType;
+ String charset;
+ if( StringUtils.isNotBlank(entry.getId())) {
+ id = entry.getId();
+ }
+ if( entry.getDocument() instanceof String) {
+ document = (String)entry.getDocument();
+ } else {
+ try {
+ document = MAPPER.writeValueAsString(entry.getDocument());
+ } catch( Exception e ) {
+ LOGGER.warn("Exception", e);
+ return;
+ }
+ }
+ if( entry.getMetadata() != null
+ && entry.getMetadata().containsKey("bucket")
+ && entry.getMetadata().get("bucket") instanceof String
+ && StringUtils.isNotBlank((String)entry.getMetadata().get("bucket") )) {
+ bucket = (String)entry.getMetadata().get("bucket");
+ } else {
+ bucket = configuration.getDefaultBucket();
+ }
+ if( entry.getMetadata() != null
+ && entry.getMetadata().containsKey("bucketType")
+ && entry.getMetadata().get("bucketType") instanceof String
+ && StringUtils.isNotBlank((String)entry.getMetadata().get("bucketType") )) {
+ bucketType = (String)entry.getMetadata().get("bucketType");
+ } else {
+ bucketType = configuration.getDefaultBucketType();
+ }
+ if( entry.getMetadata() != null
+ && entry.getMetadata().containsKey("charset")
+ && entry.getMetadata().get("charset") instanceof String
+ && StringUtils.isNotBlank((String)entry.getMetadata().get("charset") )) {
+ charset = (String)entry.getMetadata().get("charset");
+ } else {
+ charset = configuration.getDefaultCharset();
+ }
+ if( entry.getMetadata() != null
+ && entry.getMetadata().containsKey("contentType")
+ && entry.getMetadata().get("contentType") instanceof String
+ && StringUtils.isNotBlank((String)entry.getMetadata().get("contentType") )) {
+ contentType = (String)entry.getMetadata().get("contentType");
+ } else {
+ contentType = configuration.getDefaultContentType();
+ }
+
+ URIBuilder uriBuilder = new URIBuilder(client.baseURI);
+ if( bucket != null && StringUtils.isNotBlank(bucket)) {
+ uriBuilder.setPath("/riak/"+bucket);
+ }
+ if( id != null && StringUtils.isNotBlank(id)) {
+ uriBuilder.setPath("/riak/"+bucket+"/"+id);
+ }
+
+ URI uri;
+ try {
+ uri = uriBuilder.build();
+ } catch (URISyntaxException e) {
+ LOGGER.warn("URISyntaxException", e);
+ return;
+ }
+
+ HttpPost post = new HttpPost();
+ post.setHeader("Content-Type", contentType + "; charset=" + charset);
+ post.setURI(uri);
+ HttpEntity entity;
+ try {
+ entity = new StringEntity(document);
+ post.setEntity(entity);
+ } catch (UnsupportedEncodingException e) {
+ LOGGER.warn("UnsupportedEncodingException", e);
+ return;
+ }
+
+ try {
+ HttpResponse response = client.client().execute(post);
+ } catch (IOException e) {
+ LOGGER.warn("IOException", e);
+ return;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/main/jsonschema/RiakConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/main/jsonschema/RiakConfiguration.json b/streams-contrib/streams-persist-riak/src/main/jsonschema/RiakConfiguration.json
new file mode 100644
index 0000000..917347f
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/main/jsonschema/RiakConfiguration.json
@@ -0,0 +1,40 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "type": "object",
+ "javaType" : "org.apache.streams.riak.pojo.RiakConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "scheme": {
+ "type": "string"
+ },
+ "hosts": {
+ "type": "array",
+ "items": {
+ "type": "string"
+ }
+ },
+ "port": {
+ "type": "integer"
+ },
+ "defaultBucket": {
+ "type": "string",
+ "default": "streams"
+ },
+ "defaultBucketType": {
+ "type": "string",
+ "default": "default"
+ },
+ "defaultCharset": {
+ "type": "string",
+ "default": "UTF-8"
+ },
+ "defaultContentType": {
+ "type": "string",
+ "default": "application/json"
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/test/java/org/apache/streams/riak/test/RiakBinaryPersistIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/test/java/org/apache/streams/riak/test/RiakBinaryPersistIT.java b/streams-contrib/streams-persist-riak/src/test/java/org/apache/streams/riak/test/RiakBinaryPersistIT.java
new file mode 100644
index 0000000..523ec01
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/test/java/org/apache/streams/riak/test/RiakBinaryPersistIT.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.riak.test;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.riak.binary.RiakBinaryClient;
+import org.apache.streams.riak.binary.RiakBinaryPersistReader;
+import org.apache.streams.riak.binary.RiakBinaryPersistWriter;
+import org.apache.streams.riak.pojo.RiakConfiguration;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Integration test for RiakBinaryPersist.
+ */
+public class RiakBinaryPersistIT {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RiakBinaryPersistIT.class);
+
+ private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ private RiakBinaryClient testClient;
+
+ private RiakConfiguration testConfiguration;
+
+ @BeforeClass
+ public void prepareTest() throws IOException {
+
+ Config reference = ConfigFactory.load();
+ File conf = new File("target/test-classes/RiakBinaryPersistIT.conf");
+ assertTrue(conf.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(RiakConfiguration.class).detectConfiguration(typesafe, "riak");
+ testClient = RiakBinaryClient.getInstance(testConfiguration);
+
+ }
+
+ @Test
+ public void testRiakBinaryPersist() throws Exception {
+
+ RiakBinaryPersistWriter testPersistWriter = new RiakBinaryPersistWriter(testConfiguration);
+ testPersistWriter.prepare(testConfiguration);
+
+ InputStream testActivityFolderStream = RiakBinaryPersistIT.class.getClassLoader()
+ .getResourceAsStream("activities");
+ List<String> files = IOUtils.readLines(testActivityFolderStream, StandardCharsets.UTF_8);
+
+ int count = 0;
+ for( String file : files) {
+ LOGGER.info("File: " + file );
+ InputStream testActivityFileStream = RiakBinaryPersistIT.class.getClassLoader()
+ .getResourceAsStream("activities/" + file);
+ Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+
+ StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+ testPersistWriter.write( datum );
+ LOGGER.info("Wrote: " + activity.getVerb() );
+ count++;
+ }
+
+ testPersistWriter.cleanUp();
+
+ LOGGER.info("Total Written: {}", count );
+ Assert.assertEquals(count, 89);
+
+ RiakBinaryPersistReader testPersistReader = new RiakBinaryPersistReader(testConfiguration);
+ testPersistReader.prepare(testConfiguration);
+
+ StreamsResultSet readerResultSet = testPersistReader.readAll();
+ LOGGER.info("Total Read: {}", readerResultSet.size() );
+ Assert.assertEquals(readerResultSet.size(), 89);
+
+ }
+
+ @AfterClass
+ public void cleanup() throws Exception {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/test/java/org/apache/streams/riak/test/RiakHttpPersistIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/test/java/org/apache/streams/riak/test/RiakHttpPersistIT.java b/streams-contrib/streams-persist-riak/src/test/java/org/apache/streams/riak/test/RiakHttpPersistIT.java
new file mode 100644
index 0000000..2eeea30
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/test/java/org/apache/streams/riak/test/RiakHttpPersistIT.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.riak.test;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.riak.http.RiakHttpClient;
+import org.apache.streams.riak.http.RiakHttpPersistReader;
+import org.apache.streams.riak.http.RiakHttpPersistWriter;
+import org.apache.streams.riak.pojo.RiakConfiguration;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Integration test for RiakHttpPersist.
+ */
+public class RiakHttpPersistIT {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RiakHttpPersistIT.class);
+
+ private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ private RiakHttpClient testClient;
+
+ private RiakConfiguration testConfiguration;
+
+ @BeforeClass
+ public void prepareTest() throws IOException {
+
+ Config reference = ConfigFactory.load();
+ File conf = new File("target/test-classes/RiakHttpPersistIT.conf");
+ assertTrue(conf.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(RiakConfiguration.class).detectConfiguration(typesafe, "riak");
+ testClient = RiakHttpClient.getInstance(testConfiguration);
+
+ }
+
+ @Test
+ public void testRiakHttpPersist() throws Exception {
+
+ RiakHttpPersistWriter testPersistWriter = new RiakHttpPersistWriter(testConfiguration);
+ testPersistWriter.prepare(testConfiguration);
+
+ InputStream testActivityFolderStream = RiakHttpPersistIT.class.getClassLoader()
+ .getResourceAsStream("activities");
+ List<String> files = IOUtils.readLines(testActivityFolderStream, StandardCharsets.UTF_8);
+
+ // write data
+
+ int count = 0;
+ for( String file : files) {
+ LOGGER.info("File: " + file );
+ InputStream testActivityFileStream = RiakHttpPersistIT.class.getClassLoader()
+ .getResourceAsStream("activities/" + file);
+ Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+ StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+ testPersistWriter.write( datum );
+ LOGGER.info("Wrote: " + activity.getVerb() );
+ count++;
+ }
+
+ testPersistWriter.cleanUp();
+
+ LOGGER.info("Total Written: {}", count );
+ Assert.assertEquals(count, 89);
+
+ RiakHttpPersistReader testPersistReader = new RiakHttpPersistReader(testConfiguration);
+ testPersistReader.prepare(testConfiguration);
+
+ StreamsResultSet readerResultSet = testPersistReader.readAll();
+ LOGGER.info("Total Read: {}", readerResultSet.size() );
+ Assert.assertEquals(readerResultSet.size(), 89);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/test/resources/RiakBinaryPersistIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/test/resources/RiakBinaryPersistIT.conf b/streams-contrib/streams-persist-riak/src/test/resources/RiakBinaryPersistIT.conf
new file mode 100644
index 0000000..2957ab6
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/test/resources/RiakBinaryPersistIT.conf
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+riak {
+ scheme = "tcp"
+ hosts += ${riak.tcp.host}
+ port = ${riak.tcp.port}
+ defaultBucket = "binary"
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cc46ab69/streams-contrib/streams-persist-riak/src/test/resources/RiakHttpPersistIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/src/test/resources/RiakHttpPersistIT.conf b/streams-contrib/streams-persist-riak/src/test/resources/RiakHttpPersistIT.conf
new file mode 100644
index 0000000..83f097f
--- /dev/null
+++ b/streams-contrib/streams-persist-riak/src/test/resources/RiakHttpPersistIT.conf
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+riak {
+ scheme = "http"
+ hosts += ${riak.http.host}
+ port = ${riak.http.port}
+ defaultBucket = "http"
+}
\ No newline at end of file