You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/09/11 02:15:09 UTC
[1/8] incubator-streams git commit: basic implementation of kinesis
read/write
Repository: incubator-streams
Updated Branches:
refs/heads/master fbd07411e -> 498c6cc9f
basic implementation of kinesis read/write
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/20fbbdc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/20fbbdc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/20fbbdc2
Branch: refs/heads/master
Commit: 20fbbdc2b4b4ba6e17a702ccc7f7fe38d74f11f3
Parents: a8e1cc7
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Wed Sep 2 16:58:19 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Wed Sep 2 16:58:19 2015 -0500
----------------------------------------------------------------------
streams-contrib/streams-amazon-aws/pom.xml | 1 +
.../streams-persist-kinesis/README.md | 18 ++
.../streams-persist-kinesis/pom.xml | 117 +++++++++++++
.../amazon/kinesis/KinesisPersistReader.java | 171 +++++++++++++++++++
.../kinesis/KinesisPersistReaderTask.java | 108 ++++++++++++
.../amazon/kinesis/KinesisPersistWriter.java | 111 ++++++++++++
.../amazon/kinesis/KinesisConfiguration.json | 33 ++++
.../kinesis/KinesisReaderConfiguration.json | 19 +++
.../kinesis/KinesisWriterConfiguration.json | 16 ++
9 files changed, 594 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/20fbbdc2/streams-contrib/streams-amazon-aws/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/pom.xml b/streams-contrib/streams-amazon-aws/pom.xml
index 8e14dcb..59fcf47 100644
--- a/streams-contrib/streams-amazon-aws/pom.xml
+++ b/streams-contrib/streams-amazon-aws/pom.xml
@@ -37,6 +37,7 @@
</properties>
<modules>
+ <module>streams-persist-kinesis</module>
<module>streams-persist-s3</module>
</modules>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/20fbbdc2/streams-contrib/streams-amazon-aws/streams-persist-kinesis/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/README.md b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/README.md
new file mode 100644
index 0000000..338f68d
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/README.md
@@ -0,0 +1,18 @@
+streams-persist-kinesis
+==============
+
+Read/Write documents to/from Kinesis.
+
+Example writer configuration:
+
+ "kinesis": {
+ "key": "",
+ "secretKey": "",
+ "protocol": "HTTPS",
+ "region": "us-east-1",
+ "streams: [
+ "topic1",
+ "topic2"
+ ]
+ }
+
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/20fbbdc2/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml
new file mode 100644
index 0000000..e5c68f5
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>streams-amazon-aws</artifactId>
+ <groupId>org.apache.streams</groupId>
+ <version>0.3.4-PP-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streams-persist-kinesis</artifactId>
+ <name>${project.artifactId}</name>
+
+ <description>Amazon Kinesis Module</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-converters</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.amazon.kinesis.pojo</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>true</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/20fbbdc2/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
new file mode 100644
index 0000000..b755579
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.amazon.kinesis;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.collect.Queues;
+import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.streams.amazon.kinesis.KinesisConfiguration;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class KinesisPersistReader implements StreamsPersistReader, Serializable {
+
+ public final static String STREAMS_ID = "KinesisPersistReader";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistReader.class);
+
+ protected volatile Queue<StreamsDatum> persistQueue;
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private KinesisReaderConfiguration config;
+
+ protected Long pollInterval = StreamsConfigurator.detectConfiguration().getBatchFrequencyMs();
+
+ private List<String> streamNames;
+
+ private ExecutorService executor;
+
+ protected AmazonKinesisClient client;
+
+ public KinesisPersistReader() {
+ Config config = StreamsConfigurator.config.getConfig("kinesis");
+ this.config = new ComponentConfigurator<>(KinesisReaderConfiguration.class).detectConfiguration(config);
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
+
+ public KinesisPersistReader(KinesisReaderConfiguration config) {
+ this.config = config;
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
+
+ public void setConfig(KinesisReaderConfiguration config) {
+ this.config = config;
+ }
+
+ @Override
+ public void startStream() {
+
+ this.streamNames = this.config.getStreams();
+
+ for (final String stream : streamNames) {
+
+ DescribeStreamResult describeStreamResult = client.describeStream(stream);
+
+ if( "ACTIVE".equals(describeStreamResult.getStreamDescription().getStreamStatus())) {
+
+ List<Shard> shardList = describeStreamResult.getStreamDescription().getShards();
+
+ for( Shard shard : shardList ) {
+ executor.submit(new KinesisPersistReaderTask(this, stream, shard.getShardId()));
+ }
+ }
+
+ }
+
+ }
+
+ @Override
+ public StreamsResultSet readAll() {
+ return readCurrent();
+ }
+
+ public StreamsResultSet readCurrent() {
+
+ StreamsResultSet current;
+ synchronized( KinesisPersistReader.class ) {
+ current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+ persistQueue.clear();
+ }
+ return current;
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger bigInteger) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return !executor.isShutdown() && !executor.isTerminated();
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ // Connect to Kinesis
+ synchronized (this) {
+ // Create the credentials Object
+ AWSCredentials credentials = new BasicAWSCredentials(config.getKey(), config.getSecretKey());
+
+ ClientConfiguration clientConfig = new ClientConfiguration();
+ clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString()));
+
+ this.client = new AmazonKinesisClient(credentials, clientConfig);
+ if (!Strings.isNullOrEmpty(config.getRegion()))
+ this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion())));
+ }
+ streamNames = this.config.getStreams();
+ executor = Executors.newFixedThreadPool(streamNames.size());
+ }
+
+ @Override
+ public void cleanUp() {
+
+ while( !executor.isTerminated()) {
+ try {
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {}
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/20fbbdc2/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
new file mode 100644
index 0000000..9b8d817
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.amazon.kinesis;
+
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.util.Base64;
+import com.google.common.collect.Maps;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class KinesisPersistReaderTask implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistReaderTask.class);
+
+ private KinesisPersistReader reader;
+ private String streamName;
+ private String shardId;
+
+ private String shardIteratorId;
+
+ private Long pollInterval = StreamsConfigurator.detectConfiguration().getBatchFrequencyMs();
+
+ public KinesisPersistReaderTask(KinesisPersistReader reader, String streamName, String shardId) {
+ this.reader = reader;
+ this.streamName = streamName;
+ this.shardId = shardId;
+ }
+
+ @Override
+ public void run() {
+
+ GetShardIteratorRequest shardIteratorRequest = new GetShardIteratorRequest()
+ .withStreamName(this.streamName)
+ .withShardId(shardId)
+ .withShardIteratorType("TRIM_HORIZON");
+
+ GetShardIteratorResult shardIteratorResult = reader.client.getShardIterator(shardIteratorRequest);
+
+ shardIteratorId = shardIteratorResult.getShardIterator();
+
+ Map<String,Object> metadata = Maps.newHashMap();
+ metadata.put("streamName", streamName);
+ metadata.put("shardId", shardId);
+
+ while(true) {
+
+ GetRecordsRequest recordsRequest = new GetRecordsRequest()
+ .withShardIterator(shardIteratorId);
+
+ GetRecordsResult recordsResult = reader.client.getRecords(recordsRequest);
+
+ shardIteratorId = recordsResult.getNextShardIterator();
+
+ List<Record> recordList = recordsResult.getRecords();
+ for (Record record : recordList) {
+ try {
+ byte[] byteArray = record.getData().array();
+ //byte[] decoded = Base64.decode(byteArray);
+ String message = new String(byteArray, Charset.forName("UTF-8"));
+ reader.persistQueue.add(
+ new StreamsDatum(
+ message,
+ record.getPartitionKey(),
+ new DateTime(),
+ new BigInteger(record.getSequenceNumber()),
+ metadata));
+ } catch( Exception e ) {
+ LOGGER.warn("Exception processing record {}: {}", record, e);
+ }
+ }
+ try {
+ Thread.sleep(reader.pollInterval);
+ } catch (InterruptedException e) {}
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/20fbbdc2/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
new file mode 100644
index 0000000..2caa028
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
@@ -0,0 +1,111 @@
+package org.apache.streams.amazon.kinesis;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.PutRecordRequest;
+import com.amazonaws.services.kinesis.model.PutRecordResult;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.TypeConverterUtil;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 9/2/15.
+ */
+public class KinesisPersistWriter implements StreamsPersistWriter {
+
+ public final static String STREAMS_ID = "KinesisPersistWriter";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistWriter.class);
+
+ protected volatile Queue<StreamsDatum> persistQueue;
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private KinesisWriterConfiguration config;
+
+ private List<String> streamName;
+
+ private ExecutorService executor;
+
+ protected AmazonKinesisClient client;
+
+ public KinesisPersistWriter() {
+ Config config = StreamsConfigurator.config.getConfig("kinesis");
+ this.config = new ComponentConfigurator<>(KinesisWriterConfiguration.class).detectConfiguration(config);
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
+
+ public KinesisPersistWriter(KinesisWriterConfiguration config) {
+ this.config = config;
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
+
+ public void setConfig(KinesisWriterConfiguration config) {
+ this.config = config;
+ }
+
+ @Override
+ public void write(StreamsDatum entry) {
+
+ String document = (String) TypeConverterUtil.getInstance().convert(entry.getDocument(), String.class);
+
+ PutRecordRequest putRecordRequest = new PutRecordRequest()
+ .withStreamName(config.getStream())
+ .withPartitionKey(entry.getId())
+ .withData(ByteBuffer.wrap(document.getBytes()));
+
+ PutRecordResult putRecordResult = client.putRecord(putRecordRequest);
+
+ entry.setSequenceid(new BigInteger(putRecordResult.getSequenceNumber()));
+
+ LOGGER.debug("Wrote {}", entry);
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ // Connect to Kinesis
+ synchronized (this) {
+ // Create the credentials Object
+ AWSCredentials credentials = new BasicAWSCredentials(config.getKey(), config.getSecretKey());
+
+ ClientConfiguration clientConfig = new ClientConfiguration();
+ clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString()));
+
+ this.client = new AmazonKinesisClient(credentials, clientConfig);
+ if (!Strings.isNullOrEmpty(config.getRegion()))
+ this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion())));
+ }
+ executor = Executors.newSingleThreadExecutor();
+
+ }
+
+ @Override
+ public void cleanUp() {
+ try {
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.debug("Interrupted! ", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/20fbbdc2/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisConfiguration.json
new file mode 100644
index 0000000..05c7606
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisConfiguration.json
@@ -0,0 +1,33 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "type": "object",
+ "javaType" : "org.apache.streams.amazon.kinesis.KinesisConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "key": {
+ "type": "string",
+ "description": "Your Amazon Key",
+ "required": true
+ },
+ "secretKey": {
+ "type": "string",
+ "description": "Your Amazon Secret Key",
+ "required": true
+ },
+ "protocol": {
+ "type": "string",
+ "description": "Whether you are using HTTP or HTTPS",
+ "enum": ["HTTP", "HTTPS"],
+ "default": "HTTPS"
+ },
+ "region": {
+ "type": "string",
+ "description": "The AWS region where your bucket resides",
+ "required": false
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/20fbbdc2/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisReaderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisReaderConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisReaderConfiguration.json
new file mode 100644
index 0000000..bf58892
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisReaderConfiguration.json
@@ -0,0 +1,19 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "type": "object",
+ "javaType" : "org.apache.streams.amazon.kinesis.KinesisReaderConfiguration",
+ "extends": {"$ref":"KinesisConfiguration.json"},
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "streams": {
+ "type": "array",
+ "items": {
+ "type": "string"
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/20fbbdc2/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisWriterConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisWriterConfiguration.json
new file mode 100644
index 0000000..2fcf059
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisWriterConfiguration.json
@@ -0,0 +1,16 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "type": "object",
+ "javaType" : "org.apache.streams.amazon.kinesis.KinesisWriterConfiguration",
+ "extends": {"$ref":"KinesisConfiguration.json"},
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "stream": {
+ "type": "string"
+ }
+ }
+}
\ No newline at end of file
[6/8] incubator-streams git commit: correct pom version
Posted by sb...@apache.org.
correct pom version
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f2eb99a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f2eb99a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f2eb99a3
Branch: refs/heads/master
Commit: f2eb99a3da12f45f08bc0738908b2156ccf02566
Parents: e041d91
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Fri Sep 4 13:04:50 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Sep 4 13:27:34 2015 -0500
----------------------------------------------------------------------
streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f2eb99a3/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml
index e5c68f5..0f54633 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>streams-amazon-aws</artifactId>
<groupId>org.apache.streams</groupId>
- <version>0.3.4-PP-SNAPSHOT</version>
+ <version>0.3-incubating-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
[4/8] incubator-streams git commit: basic implementation of kinesis
read/write
Posted by sb...@apache.org.
basic implementation of kinesis read/write
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/54b3f803
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/54b3f803
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/54b3f803
Branch: refs/heads/master
Commit: 54b3f8031a8b04b811d9ff213c7046b33d4c240c
Parents: 092021c
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Wed Sep 2 16:58:19 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Sep 4 13:27:32 2015 -0500
----------------------------------------------------------------------
streams-contrib/streams-amazon-aws/pom.xml | 1 +
.../streams-persist-kinesis/README.md | 18 ++
.../streams-persist-kinesis/pom.xml | 117 +++++++++++++
.../amazon/kinesis/KinesisPersistReader.java | 171 +++++++++++++++++++
.../kinesis/KinesisPersistReaderTask.java | 108 ++++++++++++
.../amazon/kinesis/KinesisPersistWriter.java | 111 ++++++++++++
.../amazon/kinesis/KinesisConfiguration.json | 33 ++++
.../kinesis/KinesisReaderConfiguration.json | 19 +++
.../kinesis/KinesisWriterConfiguration.json | 16 ++
9 files changed, 594 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/pom.xml b/streams-contrib/streams-amazon-aws/pom.xml
index 8e14dcb..59fcf47 100644
--- a/streams-contrib/streams-amazon-aws/pom.xml
+++ b/streams-contrib/streams-amazon-aws/pom.xml
@@ -37,6 +37,7 @@
</properties>
<modules>
+ <module>streams-persist-kinesis</module>
<module>streams-persist-s3</module>
</modules>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/streams-persist-kinesis/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/README.md b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/README.md
new file mode 100644
index 0000000..338f68d
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/README.md
@@ -0,0 +1,18 @@
+streams-persist-kinesis
+==============
+
+Read/Write documents to/from Kinesis.
+
+Example writer configuration:
+
+ "kinesis": {
+ "key": "",
+ "secretKey": "",
+ "protocol": "HTTPS",
+ "region": "us-east-1",
+ "streams: [
+ "topic1",
+ "topic2"
+ ]
+ }
+
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml
new file mode 100644
index 0000000..e5c68f5
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>streams-amazon-aws</artifactId>
+ <groupId>org.apache.streams</groupId>
+ <version>0.3.4-PP-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streams-persist-kinesis</artifactId>
+ <name>${project.artifactId}</name>
+
+ <description>Amazon Kinesis Module</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-converters</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.amazon.kinesis.pojo</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>true</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
new file mode 100644
index 0000000..b755579
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.amazon.kinesis;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.collect.Queues;
+import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.streams.amazon.kinesis.KinesisConfiguration;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class KinesisPersistReader implements StreamsPersistReader, Serializable {
+
+ public final static String STREAMS_ID = "KinesisPersistReader";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistReader.class);
+
+ protected volatile Queue<StreamsDatum> persistQueue;
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private KinesisReaderConfiguration config;
+
+ protected Long pollInterval = StreamsConfigurator.detectConfiguration().getBatchFrequencyMs();
+
+ private List<String> streamNames;
+
+ private ExecutorService executor;
+
+ protected AmazonKinesisClient client;
+
+ public KinesisPersistReader() {
+ Config config = StreamsConfigurator.config.getConfig("kinesis");
+ this.config = new ComponentConfigurator<>(KinesisReaderConfiguration.class).detectConfiguration(config);
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
+
+ public KinesisPersistReader(KinesisReaderConfiguration config) {
+ this.config = config;
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
+
+ public void setConfig(KinesisReaderConfiguration config) {
+ this.config = config;
+ }
+
+ @Override
+ public void startStream() {
+
+ this.streamNames = this.config.getStreams();
+
+ for (final String stream : streamNames) {
+
+ DescribeStreamResult describeStreamResult = client.describeStream(stream);
+
+ if( "ACTIVE".equals(describeStreamResult.getStreamDescription().getStreamStatus())) {
+
+ List<Shard> shardList = describeStreamResult.getStreamDescription().getShards();
+
+ for( Shard shard : shardList ) {
+ executor.submit(new KinesisPersistReaderTask(this, stream, shard.getShardId()));
+ }
+ }
+
+ }
+
+ }
+
+ @Override
+ public StreamsResultSet readAll() {
+ return readCurrent();
+ }
+
+ public StreamsResultSet readCurrent() {
+
+ StreamsResultSet current;
+ synchronized( KinesisPersistReader.class ) {
+ current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+ persistQueue.clear();
+ }
+ return current;
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger bigInteger) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return !executor.isShutdown() && !executor.isTerminated();
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ // Connect to Kinesis
+ synchronized (this) {
+ // Create the credentials Object
+ AWSCredentials credentials = new BasicAWSCredentials(config.getKey(), config.getSecretKey());
+
+ ClientConfiguration clientConfig = new ClientConfiguration();
+ clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString()));
+
+ this.client = new AmazonKinesisClient(credentials, clientConfig);
+ if (!Strings.isNullOrEmpty(config.getRegion()))
+ this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion())));
+ }
+ streamNames = this.config.getStreams();
+ executor = Executors.newFixedThreadPool(streamNames.size());
+ }
+
+ @Override
+ public void cleanUp() {
+
+ while( !executor.isTerminated()) {
+ try {
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {}
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
new file mode 100644
index 0000000..9b8d817
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.amazon.kinesis;
+
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.util.Base64;
+import com.google.common.collect.Maps;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class KinesisPersistReaderTask implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistReaderTask.class);
+
+ private KinesisPersistReader reader;
+ private String streamName;
+ private String shardId;
+
+ private String shardIteratorId;
+
+ private Long pollInterval = StreamsConfigurator.detectConfiguration().getBatchFrequencyMs();
+
+ public KinesisPersistReaderTask(KinesisPersistReader reader, String streamName, String shardId) {
+ this.reader = reader;
+ this.streamName = streamName;
+ this.shardId = shardId;
+ }
+
+ @Override
+ public void run() {
+
+ GetShardIteratorRequest shardIteratorRequest = new GetShardIteratorRequest()
+ .withStreamName(this.streamName)
+ .withShardId(shardId)
+ .withShardIteratorType("TRIM_HORIZON");
+
+ GetShardIteratorResult shardIteratorResult = reader.client.getShardIterator(shardIteratorRequest);
+
+ shardIteratorId = shardIteratorResult.getShardIterator();
+
+ Map<String,Object> metadata = Maps.newHashMap();
+ metadata.put("streamName", streamName);
+ metadata.put("shardId", shardId);
+
+ while(true) {
+
+ GetRecordsRequest recordsRequest = new GetRecordsRequest()
+ .withShardIterator(shardIteratorId);
+
+ GetRecordsResult recordsResult = reader.client.getRecords(recordsRequest);
+
+ shardIteratorId = recordsResult.getNextShardIterator();
+
+ List<Record> recordList = recordsResult.getRecords();
+ for (Record record : recordList) {
+ try {
+ byte[] byteArray = record.getData().array();
+ //byte[] decoded = Base64.decode(byteArray);
+ String message = new String(byteArray, Charset.forName("UTF-8"));
+ reader.persistQueue.add(
+ new StreamsDatum(
+ message,
+ record.getPartitionKey(),
+ new DateTime(),
+ new BigInteger(record.getSequenceNumber()),
+ metadata));
+ } catch( Exception e ) {
+ LOGGER.warn("Exception processing record {}: {}", record, e);
+ }
+ }
+ try {
+ Thread.sleep(reader.pollInterval);
+ } catch (InterruptedException e) {}
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
new file mode 100644
index 0000000..2caa028
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
@@ -0,0 +1,111 @@
+package org.apache.streams.amazon.kinesis;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.PutRecordRequest;
+import com.amazonaws.services.kinesis.model.PutRecordResult;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.TypeConverterUtil;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 9/2/15.
+ */
+public class KinesisPersistWriter implements StreamsPersistWriter {
+
+ public final static String STREAMS_ID = "KinesisPersistWriter";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistWriter.class);
+
+ protected volatile Queue<StreamsDatum> persistQueue;
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private KinesisWriterConfiguration config;
+
+ private List<String> streamName;
+
+ private ExecutorService executor;
+
+ protected AmazonKinesisClient client;
+
+ public KinesisPersistWriter() {
+ Config config = StreamsConfigurator.config.getConfig("kinesis");
+ this.config = new ComponentConfigurator<>(KinesisWriterConfiguration.class).detectConfiguration(config);
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
+
+ public KinesisPersistWriter(KinesisWriterConfiguration config) {
+ this.config = config;
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
+
+ public void setConfig(KinesisWriterConfiguration config) {
+ this.config = config;
+ }
+
+ @Override
+ public void write(StreamsDatum entry) {
+
+ String document = (String) TypeConverterUtil.getInstance().convert(entry.getDocument(), String.class);
+
+ PutRecordRequest putRecordRequest = new PutRecordRequest()
+ .withStreamName(config.getStream())
+ .withPartitionKey(entry.getId())
+ .withData(ByteBuffer.wrap(document.getBytes()));
+
+ PutRecordResult putRecordResult = client.putRecord(putRecordRequest);
+
+ entry.setSequenceid(new BigInteger(putRecordResult.getSequenceNumber()));
+
+ LOGGER.debug("Wrote {}", entry);
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ // Connect to Kinesis
+ synchronized (this) {
+ // Create the credentials Object
+ AWSCredentials credentials = new BasicAWSCredentials(config.getKey(), config.getSecretKey());
+
+ ClientConfiguration clientConfig = new ClientConfiguration();
+ clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString()));
+
+ this.client = new AmazonKinesisClient(credentials, clientConfig);
+ if (!Strings.isNullOrEmpty(config.getRegion()))
+ this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion())));
+ }
+ executor = Executors.newSingleThreadExecutor();
+
+ }
+
+ @Override
+ public void cleanUp() {
+ try {
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.debug("Interrupted! ", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisConfiguration.json
new file mode 100644
index 0000000..05c7606
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisConfiguration.json
@@ -0,0 +1,33 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "type": "object",
+ "javaType" : "org.apache.streams.amazon.kinesis.KinesisConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "key": {
+ "type": "string",
+ "description": "Your Amazon Key",
+ "required": true
+ },
+ "secretKey": {
+ "type": "string",
+ "description": "Your Amazon Secret Key",
+ "required": true
+ },
+ "protocol": {
+ "type": "string",
+ "description": "Whether you are using HTTP or HTTPS",
+ "enum": ["HTTP", "HTTPS"],
+ "default": "HTTPS"
+ },
+ "region": {
+ "type": "string",
+ "description": "The AWS region where your bucket resides",
+ "required": false
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisReaderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisReaderConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisReaderConfiguration.json
new file mode 100644
index 0000000..bf58892
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisReaderConfiguration.json
@@ -0,0 +1,19 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "type": "object",
+ "javaType" : "org.apache.streams.amazon.kinesis.KinesisReaderConfiguration",
+ "extends": {"$ref":"KinesisConfiguration.json"},
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "streams": {
+ "type": "array",
+ "items": {
+ "type": "string"
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/54b3f803/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisWriterConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisWriterConfiguration.json
new file mode 100644
index 0000000..2fcf059
--- /dev/null
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/jsonschema/org/apache/streams/amazon/kinesis/KinesisWriterConfiguration.json
@@ -0,0 +1,16 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "type": "object",
+ "javaType" : "org.apache.streams.amazon.kinesis.KinesisWriterConfiguration",
+ "extends": {"$ref":"KinesisConfiguration.json"},
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "stream": {
+ "type": "string"
+ }
+ }
+}
\ No newline at end of file
[2/8] incubator-streams git commit: log counts within each batch and
millis behind
Posted by sb...@apache.org.
log counts within each batch and millis behind
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b6db3064
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b6db3064
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b6db3064
Branch: refs/heads/master
Commit: b6db3064a7d8f29239627b98bb93483aa268646e
Parents: 20fbbdc
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Sep 3 17:26:25 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Sep 3 17:26:25 2015 -0500
----------------------------------------------------------------------
streams-contrib/streams-amazon-aws/pom.xml | 2 +-
.../apache/streams/amazon/kinesis/KinesisPersistReaderTask.java | 3 +++
2 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b6db3064/streams-contrib/streams-amazon-aws/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/pom.xml b/streams-contrib/streams-amazon-aws/pom.xml
index 59fcf47..fd8264d 100644
--- a/streams-contrib/streams-amazon-aws/pom.xml
+++ b/streams-contrib/streams-amazon-aws/pom.xml
@@ -46,7 +46,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
- <version>1.8.11</version>
+ <version>1.10.15</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b6db3064/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
index 9b8d817..7753031 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
@@ -79,9 +79,12 @@ public class KinesisPersistReaderTask implements Runnable {
GetRecordsResult recordsResult = reader.client.getRecords(recordsRequest);
+ LOGGER.info("{} records {} millis behind {}:{}:{} ", recordsResult.getRecords().size(), recordsResult.getMillisBehindLatest(), streamName, shardId, shardIteratorId);
+
shardIteratorId = recordsResult.getNextShardIterator();
List<Record> recordList = recordsResult.getRecords();
+
for (Record record : recordList) {
try {
byte[] byteArray = record.getData().array();
[5/8] incubator-streams git commit: log counts within each batch and
millis behind
Posted by sb...@apache.org.
log counts within each batch and millis behind
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e041d91b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e041d91b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e041d91b
Branch: refs/heads/master
Commit: e041d91b8fefb459ffb8eb961884bd884228cf20
Parents: 54b3f80
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Sep 3 17:26:25 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Sep 4 13:27:33 2015 -0500
----------------------------------------------------------------------
streams-contrib/streams-amazon-aws/pom.xml | 2 +-
.../apache/streams/amazon/kinesis/KinesisPersistReaderTask.java | 3 +++
2 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e041d91b/streams-contrib/streams-amazon-aws/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/pom.xml b/streams-contrib/streams-amazon-aws/pom.xml
index 59fcf47..fd8264d 100644
--- a/streams-contrib/streams-amazon-aws/pom.xml
+++ b/streams-contrib/streams-amazon-aws/pom.xml
@@ -46,7 +46,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
- <version>1.8.11</version>
+ <version>1.10.15</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e041d91b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
index 9b8d817..7753031 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
@@ -79,9 +79,12 @@ public class KinesisPersistReaderTask implements Runnable {
GetRecordsResult recordsResult = reader.client.getRecords(recordsRequest);
+ LOGGER.info("{} records {} millis behind {}:{}:{} ", recordsResult.getRecords().size(), recordsResult.getMillisBehindLatest(), streamName, shardId, shardIteratorId);
+
shardIteratorId = recordsResult.getNextShardIterator();
List<Record> recordList = recordsResult.getRecords();
+
for (Record record : recordList) {
try {
byte[] byteArray = record.getData().array();
[7/8] incubator-streams git commit: Merge branch 'STREAMS-361' of
https://github.com/steveblackmon/incubator-streams into STREAMS-361
Posted by sb...@apache.org.
Merge branch 'STREAMS-361' of https://github.com/steveblackmon/incubator-streams into STREAMS-361
* 'STREAMS-361' of https://github.com/steveblackmon/incubator-streams:
correct pom version
log counts within each batch and millis behind
basic implementation of kinesis read/write
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/767ca378
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/767ca378
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/767ca378
Branch: refs/heads/master
Commit: 767ca3780bea2259bb287e4edc5af34acde3c724
Parents: f2eb99a f1192e9
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Fri Sep 4 13:43:56 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Sep 4 13:43:56 2015 -0500
----------------------------------------------------------------------
----------------------------------------------------------------------
[8/8] incubator-streams git commit: Merge branch 'STREAMS-361'
Posted by sb...@apache.org.
Merge branch 'STREAMS-361'
* STREAMS-361:
correct pom version
log counts within each batch and millis behind
basic implementation of kinesis read/write
correct pom version
log counts within each batch and millis behind
basic implementation of kinesis read/write
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/498c6cc9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/498c6cc9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/498c6cc9
Branch: refs/heads/master
Commit: 498c6cc9f1bbe1ffcc8a7f6d5da86d06e9dacea2
Parents: fbd0741 767ca37
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Sep 10 18:54:58 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Sep 10 18:54:58 2015 -0500
----------------------------------------------------------------------
streams-contrib/streams-amazon-aws/pom.xml | 3 +-
.../streams-persist-kinesis/README.md | 18 ++
.../streams-persist-kinesis/pom.xml | 117 +++++++++++++
.../amazon/kinesis/KinesisPersistReader.java | 171 +++++++++++++++++++
.../kinesis/KinesisPersistReaderTask.java | 111 ++++++++++++
.../amazon/kinesis/KinesisPersistWriter.java | 111 ++++++++++++
.../amazon/kinesis/KinesisConfiguration.json | 33 ++++
.../kinesis/KinesisReaderConfiguration.json | 19 +++
.../kinesis/KinesisWriterConfiguration.json | 16 ++
9 files changed, 598 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
[3/8] incubator-streams git commit: correct pom version
Posted by sb...@apache.org.
correct pom version
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f1192e96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f1192e96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f1192e96
Branch: refs/heads/master
Commit: f1192e96978b53c7eb6f04d53ef376cdb56c3003
Parents: b6db306
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Fri Sep 4 13:04:50 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Sep 4 13:04:50 2015 -0500
----------------------------------------------------------------------
streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1192e96/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml
index e5c68f5..0f54633 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>streams-amazon-aws</artifactId>
<groupId>org.apache.streams</groupId>
- <version>0.3.4-PP-SNAPSHOT</version>
+ <version>0.3-incubating-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>