You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/09/11 02:15:12 UTC

[4/8] incubator-streams git commit: basic implementation of kinesis read/write

basic implementation of kinesis read/write


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/54b3f803
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/54b3f803
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/54b3f803

Branch: refs/heads/master
Commit: 54b3f8031a8b04b811d9ff213c7046b33d4c240c
Parents: 092021c
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Wed Sep 2 16:58:19 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Sep 4 13:27:32 2015 -0500

----------------------------------------------------------------------
 streams-contrib/streams-amazon-aws/pom.xml      |   1 +
 .../streams-persist-kinesis/README.md           |  18 ++
 .../streams-persist-kinesis/pom.xml             | 117 +++++++++++++
 .../amazon/kinesis/KinesisPersistReader.java    | 171 +++++++++++++++++++
 .../kinesis/KinesisPersistReaderTask.java       | 108 ++++++++++++
 .../amazon/kinesis/KinesisPersistWriter.java    | 111 ++++++++++++
 .../amazon/kinesis/KinesisConfiguration.json    |  33 ++++
 .../kinesis/KinesisReaderConfiguration.json     |  19 +++
 .../kinesis/KinesisWriterConfiguration.json     |  16 ++
 9 files changed, 594 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/pom.xml b/streams-contrib/streams-amazon-aws/pom.xml
index 8e14dcb..59fcf47 100644
--- a/streams-contrib/streams-amazon-aws/pom.xml
+++ b/streams-contrib/streams-amazon-aws/pom.xml
@@ -37,6 +37,7 @@
     </properties>
 
     <modules>
+        <module>streams-persist-kinesis</module>
         <module>streams-persist-s3</module>
     </modules>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/streams-persist-kinesis/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/README.md b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/README.md
new file mode 100644
index 0000000..338f68d
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/README.md
@@ -0,0 +1,18 @@
+streams-persist-kinesis
+==============
+
+Read/Write documents to/from Kinesis.
+
+Example writer configuration:
+
+    "kinesis": {
+        "key": "",
+        "secretKey": "",
+        "protocol": "HTTPS",
+        "region": "us-east-1",
+        "streams: [
+            "topic1",
+            "topic2"
+        ]
+    }
+

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml
new file mode 100644
index 0000000..e5c68f5
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>streams-amazon-aws</artifactId>
+        <groupId>org.apache.streams</groupId>
+        <version>0.3.4-PP-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streams-persist-kinesis</artifactId>
+    <name>${project.artifactId}</name>
+
+    <description>Amazon Kinesis Module</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-converters</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-util</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        <sourcePath>src/main/jsonschema</sourcePath>
+                    </sourcePaths>
+                    <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    <targetPackage>org.apache.streams.amazon.kinesis.pojo</targetPackage>
+                    <useLongIntegers>true</useLongIntegers>
+                    <useJodaDates>true</useJodaDates>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
new file mode 100644
index 0000000..b755579
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.amazon.kinesis;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.collect.Queues;
+import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.streams.amazon.kinesis.KinesisConfiguration;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class KinesisPersistReader implements StreamsPersistReader, Serializable {
+
+    public final static String STREAMS_ID = "KinesisPersistReader";
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistReader.class);
+
+    protected volatile Queue<StreamsDatum> persistQueue;
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    private KinesisReaderConfiguration config;
+
+    protected Long pollInterval = StreamsConfigurator.detectConfiguration().getBatchFrequencyMs();
+
+    private List<String> streamNames;
+
+    private ExecutorService executor;
+
+    protected AmazonKinesisClient client;
+
+    public KinesisPersistReader() {
+        Config config = StreamsConfigurator.config.getConfig("kinesis");
+        this.config = new ComponentConfigurator<>(KinesisReaderConfiguration.class).detectConfiguration(config);
+        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+    }
+
+    public KinesisPersistReader(KinesisReaderConfiguration config) {
+        this.config = config;
+        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+    }
+
+    public void setConfig(KinesisReaderConfiguration config) {
+        this.config = config;
+    }
+
+    @Override
+    public void startStream() {
+
+        this.streamNames = this.config.getStreams();
+
+        for (final String stream : streamNames) {
+
+            DescribeStreamResult describeStreamResult = client.describeStream(stream);
+
+            if( "ACTIVE".equals(describeStreamResult.getStreamDescription().getStreamStatus())) {
+
+                List<Shard> shardList = describeStreamResult.getStreamDescription().getShards();
+
+                for( Shard shard : shardList ) {
+                    executor.submit(new KinesisPersistReaderTask(this, stream, shard.getShardId()));
+                }
+            }
+
+        }
+
+    }
+
+    @Override
+    public StreamsResultSet readAll() {
+        return readCurrent();
+    }
+
+    public StreamsResultSet readCurrent() {
+
+        StreamsResultSet current;
+        synchronized( KinesisPersistReader.class ) {
+            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+            persistQueue.clear();
+        }
+        return current;
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger bigInteger) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return !executor.isShutdown() && !executor.isTerminated();
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        // Connect to Kinesis
+        synchronized (this) {
+            // Create the credentials Object
+            AWSCredentials credentials = new BasicAWSCredentials(config.getKey(), config.getSecretKey());
+
+            ClientConfiguration clientConfig = new ClientConfiguration();
+            clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString()));
+
+            this.client = new AmazonKinesisClient(credentials, clientConfig);
+            if (!Strings.isNullOrEmpty(config.getRegion()))
+                this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion())));
+        }
+        streamNames = this.config.getStreams();
+        executor = Executors.newFixedThreadPool(streamNames.size());
+    }
+
+    @Override
+    public void cleanUp() {
+
+        while( !executor.isTerminated()) {
+            try {
+                executor.awaitTermination(5, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {}
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
new file mode 100644
index 0000000..9b8d817
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.amazon.kinesis;
+
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.util.Base64;
+import com.google.common.collect.Maps;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class KinesisPersistReaderTask implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistReaderTask.class);
+
+    private KinesisPersistReader reader;
+    private String streamName;
+    private String shardId;
+
+    private String shardIteratorId;
+
+    private Long pollInterval = StreamsConfigurator.detectConfiguration().getBatchFrequencyMs();
+
+    public KinesisPersistReaderTask(KinesisPersistReader reader, String streamName, String shardId) {
+        this.reader = reader;
+        this.streamName = streamName;
+        this.shardId = shardId;
+    }
+
+    @Override
+    public void run() {
+
+        GetShardIteratorRequest shardIteratorRequest = new GetShardIteratorRequest()
+                .withStreamName(this.streamName)
+                .withShardId(shardId)
+                .withShardIteratorType("TRIM_HORIZON");
+
+        GetShardIteratorResult shardIteratorResult = reader.client.getShardIterator(shardIteratorRequest);
+
+        shardIteratorId = shardIteratorResult.getShardIterator();
+
+        Map<String,Object> metadata = Maps.newHashMap();
+        metadata.put("streamName", streamName);
+        metadata.put("shardId", shardId);
+
+        while(true) {
+
+            GetRecordsRequest recordsRequest = new GetRecordsRequest()
+                    .withShardIterator(shardIteratorId);
+
+            GetRecordsResult recordsResult = reader.client.getRecords(recordsRequest);
+
+            shardIteratorId = recordsResult.getNextShardIterator();
+
+            List<Record> recordList = recordsResult.getRecords();
+            for (Record record : recordList) {
+                try {
+                    byte[] byteArray = record.getData().array();
+                    //byte[] decoded = Base64.decode(byteArray);
+                    String message = new String(byteArray, Charset.forName("UTF-8"));
+                    reader.persistQueue.add(
+                            new StreamsDatum(
+                                    message,
+                                    record.getPartitionKey(),
+                                    new DateTime(),
+                                    new BigInteger(record.getSequenceNumber()),
+                                    metadata));
+                } catch( Exception e ) {
+                    LOGGER.warn("Exception processing record {}: {}", record, e);
+                }
+            }
+            try {
+                Thread.sleep(reader.pollInterval);
+            } catch (InterruptedException e) {}
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
new file mode 100644
index 0000000..2caa028
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
@@ -0,0 +1,111 @@
+package org.apache.streams.amazon.kinesis;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.PutRecordRequest;
+import com.amazonaws.services.kinesis.model.PutRecordResult;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.TypeConverterUtil;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 9/2/15.
+ */
+public class KinesisPersistWriter implements StreamsPersistWriter {
+
+    public final static String STREAMS_ID = "KinesisPersistWriter";
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistWriter.class);
+
+    protected volatile Queue<StreamsDatum> persistQueue;
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    private KinesisWriterConfiguration config;
+
+    private List<String> streamName;
+
+    private ExecutorService executor;
+
+    protected AmazonKinesisClient client;
+
+    public KinesisPersistWriter() {
+        Config config = StreamsConfigurator.config.getConfig("kinesis");
+        this.config = new ComponentConfigurator<>(KinesisWriterConfiguration.class).detectConfiguration(config);
+        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+    }
+
+    public KinesisPersistWriter(KinesisWriterConfiguration config) {
+        this.config = config;
+        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+    }
+
+    public void setConfig(KinesisWriterConfiguration config) {
+        this.config = config;
+    }
+
+    @Override
+    public void write(StreamsDatum entry) {
+
+        String document = (String) TypeConverterUtil.getInstance().convert(entry.getDocument(), String.class);
+
+        PutRecordRequest putRecordRequest = new PutRecordRequest()
+                .withStreamName(config.getStream())
+                .withPartitionKey(entry.getId())
+                .withData(ByteBuffer.wrap(document.getBytes()));
+
+        PutRecordResult putRecordResult = client.putRecord(putRecordRequest);
+
+        entry.setSequenceid(new BigInteger(putRecordResult.getSequenceNumber()));
+
+        LOGGER.debug("Wrote {}", entry);
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        // Connect to Kinesis
+        synchronized (this) {
+            // Create the credentials Object
+            AWSCredentials credentials = new BasicAWSCredentials(config.getKey(), config.getSecretKey());
+
+            ClientConfiguration clientConfig = new ClientConfiguration();
+            clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString()));
+
+            this.client = new AmazonKinesisClient(credentials, clientConfig);
+            if (!Strings.isNullOrEmpty(config.getRegion()))
+                this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion())));
+        }
+        executor = Executors.newSingleThreadExecutor();
+
+    }
+
+    @Override
+    public void cleanUp() {
+        try {
+            executor.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.debug("Interrupted! ", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisConfiguration.json
new file mode 100644
index 0000000..05c7606
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisConfiguration.json
@@ -0,0 +1,33 @@
+{
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "type": "object",
+    "javaType" : "org.apache.streams.amazon.kinesis.KinesisConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "key": {
+            "type": "string",
+            "description": "Your Amazon Key",
+            "required": true
+        },
+        "secretKey": {
+            "type": "string",
+            "description": "Your Amazon Secret Key",
+            "required": true
+        },
+        "protocol": {
+            "type": "string",
+            "description": "Whether you are using HTTP or HTTPS",
+            "enum": ["HTTP", "HTTPS"],
+            "default": "HTTPS"
+        },
+        "region": {
+            "type": "string",
+            "description": "The AWS region where your bucket resides",
+            "required": false
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisReaderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisReaderConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisReaderConfiguration.json
new file mode 100644
index 0000000..bf58892
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisReaderConfiguration.json
@@ -0,0 +1,19 @@
+{
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "type": "object",
+    "javaType" : "org.apache.streams.amazon.kinesis.KinesisReaderConfiguration",
+    "extends": {"$ref":"KinesisConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "streams": {
+            "type": "array",
+            "items": {
+                "type": "string"
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisWriterConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisWriterConfiguration.json
new file mode 100644
index 0000000..2fcf059
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisWriterConfiguration.json
@@ -0,0 +1,16 @@
+{
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "type": "object",
+    "javaType" : "org.apache.streams.amazon.kinesis.KinesisWriterConfiguration",
+    "extends": {"$ref":"KinesisConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "stream": {
+            "type": "string"
+        }
+    }
+}
\ No newline at end of file