You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:34:53 UTC
[03/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
- </encoder>
- </appender>
-
- <root level="WARN">
- <appender-ref ref="STDOUT"/>
- </root>
- <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/pom.xml b/flink-streaming-connectors/flink-connector-kinesis/pom.xml
deleted file mode 100644
index 29170ad..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/pom.xml
+++ /dev/null
@@ -1,164 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-connectors</artifactId>
- <version>1.2-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-connector-kinesis_2.10</artifactId>
- <name>flink-connector-kinesis</name>
- <properties>
- <aws.sdk.version>1.10.71</aws.sdk.version>
- <aws.kinesis-kcl.version>1.6.2</aws.kinesis-kcl.version>
- <aws.kinesis-kpl.version>0.10.2</aws.kinesis-kpl.version>
- </properties>
-
- <packaging>jar</packaging>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-tests_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- Note:
- The below dependencies are licenced under the Amazon Software License.
- Flink includes the "flink-connector-kinesis" only as an optional dependency for that reason.
- -->
- <dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-kinesis</artifactId>
- <version>${aws.sdk.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>amazon-kinesis-producer</artifactId>
- <version>${aws.kinesis-kpl.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>amazon-kinesis-client</artifactId>
- <version>${aws.kinesis-kcl.version}</version>
- <!--
- We're excluding the below from the KCL since we'll only be using the
- com.amazonaws.services.kinesis.clientlibrary.types.UserRecord class, which will not need these dependencies.
- -->
- <exclusions>
- <exclusion>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-dynamodb</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-cloudwatch</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <id>shade-flink</id>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
- <artifactSet combine.children="append">
- <includes>
- <include>com.amazonaws:*</include>
- <include>com.google.protobuf:*</include>
- </includes>
- </artifactSet>
- <relocations combine.children="override">
- <!-- DO NOT RELOCATE GUAVA IN THIS PACKAGE -->
- <relocation>
- <pattern>org.objectweb.asm</pattern>
- <shadedPattern>org.apache.flink.shaded.org.objectweb.asm</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.google.protobuf</pattern>
- <shadedPattern>org.apache.flink.kinesis.shaded.com.google.protobuf</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.amazonaws</pattern>
- <shadedPattern>org.apache.flink.kinesis.shaded.com.amazonaws</shadedPattern>
- </relocation>
- </relocations>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
deleted file mode 100644
index a62dc10..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
-import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
-import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
-import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
-import java.util.Properties;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The Flink Kinesis Consumer is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis
- * streams within the same AWS service region, and can handle resharding of streams. Each subtask of the consumer is
- * responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will
- * change as shards are closed and created by Kinesis.
- *
- * <p>To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis
- * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for
- * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial
- * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST.</p>
- *
- * @param <T> the type of data emitted
- */
-public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
- implements CheckpointedAsynchronously<HashMap<KinesisStreamShard, SequenceNumber>>, ResultTypeQueryable<T> {
-
- private static final long serialVersionUID = 4724006128720664870L;
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class);
-
- // ------------------------------------------------------------------------
- // Consumer properties
- // ------------------------------------------------------------------------
-
- /** The names of the Kinesis streams that we will be consuming from */
- private final List<String> streams;
-
- /** Properties to parametrize settings such as AWS service region, initial position in stream,
- * shard list retrieval behaviours, etc */
- private final Properties configProps;
-
- /** User supplied deseriliazation schema to convert Kinesis byte messages to Flink objects */
- private final KinesisDeserializationSchema<T> deserializer;
-
- // ------------------------------------------------------------------------
- // Runtime state
- // ------------------------------------------------------------------------
-
- /** Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards */
- private transient KinesisDataFetcher<T> fetcher;
-
- /** The sequence numbers in the last state snapshot of this subtask */
- private transient HashMap<KinesisStreamShard, SequenceNumber> lastStateSnapshot;
-
- /** The sequence numbers to restore to upon restore from failure */
- private transient HashMap<KinesisStreamShard, SequenceNumber> sequenceNumsToRestore;
-
- private volatile boolean running = true;
-
-
- // ------------------------------------------------------------------------
- // Constructors
- // ------------------------------------------------------------------------
-
- /**
- * Creates a new Flink Kinesis Consumer.
- *
- * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming
- * from are configured with a {@link Properties} instance.</p>
- *
- * @param stream
- * The single AWS Kinesis stream to read from.
- * @param deserializer
- * The deserializer used to convert raw bytes of Kinesis records to Java objects (without key).
- * @param configProps
- * The properties used to configure AWS credentials, AWS region, and initial starting position.
- */
- public FlinkKinesisConsumer(String stream, DeserializationSchema<T> deserializer, Properties configProps) {
- this(stream, new KinesisDeserializationSchemaWrapper<>(deserializer), configProps);
- }
-
- /**
- * Creates a new Flink Kinesis Consumer.
- *
- * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming
- * from are configured with a {@link Properties} instance.</p>
- *
- * @param stream
- * The single AWS Kinesis stream to read from.
- * @param deserializer
- * The keyed deserializer used to convert raw bytes of Kinesis records to Java objects.
- * @param configProps
- * The properties used to configure AWS credentials, AWS region, and initial starting position.
- */
- public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema<T> deserializer, Properties configProps) {
- this(Collections.singletonList(stream), deserializer, configProps);
- }
-
- /**
- * Creates a new Flink Kinesis Consumer.
- *
- * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming
- * from are configured with a {@link Properties} instance.</p>
- *
- * @param streams
- * The AWS Kinesis streams to read from.
- * @param deserializer
- * The keyed deserializer used to convert raw bytes of Kinesis records to Java objects.
- * @param configProps
- * The properties used to configure AWS credentials, AWS region, and initial starting position.
- */
- public FlinkKinesisConsumer(List<String> streams, KinesisDeserializationSchema<T> deserializer, Properties configProps) {
- checkNotNull(streams, "streams can not be null");
- checkArgument(streams.size() != 0, "must be consuming at least 1 stream");
- checkArgument(!streams.contains(""), "stream names cannot be empty Strings");
- this.streams = streams;
-
- this.configProps = checkNotNull(configProps, "configProps can not be null");
-
- // check the configuration properties for any conflicting settings
- KinesisConfigUtil.validateConsumerConfiguration(this.configProps);
-
- this.deserializer = checkNotNull(deserializer, "deserializer can not be null");
-
- if (LOG.isInfoEnabled()) {
- StringBuilder sb = new StringBuilder();
- for (String stream : streams) {
- sb.append(stream).append(", ");
- }
- LOG.info("Flink Kinesis Consumer is going to read the following streams: {}", sb.toString());
- }
- }
-
- // ------------------------------------------------------------------------
- // Source life cycle
- // ------------------------------------------------------------------------
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- // restore to the last known sequence numbers from the latest complete snapshot
- if (sequenceNumsToRestore != null) {
- if (LOG.isInfoEnabled()) {
- LOG.info("Subtask {} is restoring sequence numbers {} from previous checkpointed state",
- getRuntimeContext().getIndexOfThisSubtask(), sequenceNumsToRestore.toString());
- }
-
- // initialize sequence numbers with restored state
- lastStateSnapshot = sequenceNumsToRestore;
- } else {
- // start fresh with empty sequence numbers if there are no snapshots to restore from.
- lastStateSnapshot = new HashMap<>();
- }
- }
-
- @Override
- public void run(SourceContext<T> sourceContext) throws Exception {
-
- // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
- // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
- // can potentially have new shards to subscribe to later on
- fetcher = new KinesisDataFetcher<>(
- streams, sourceContext, getRuntimeContext(), configProps, deserializer);
-
- boolean isRestoringFromFailure = (sequenceNumsToRestore != null);
- fetcher.setIsRestoringFromFailure(isRestoringFromFailure);
-
- // if we are restoring from a checkpoint, we iterate over the restored
- // state and accordingly seed the fetcher with subscribed shards states
- if (isRestoringFromFailure) {
- for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) {
- fetcher.advanceLastDiscoveredShardOfStream(
- restored.getKey().getStreamName(), restored.getKey().getShard().getShardId());
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
- " starting state set to the restored sequence number {}",
- getRuntimeContext().getIndexOfThisSubtask(), restored.getKey().toString(), restored.getValue());
- }
- fetcher.registerNewSubscribedShardState(
- new KinesisStreamShardState(restored.getKey(), restored.getValue()));
- }
- }
-
- // check that we are running before starting the fetcher
- if (!running) {
- return;
- }
-
- // start the fetcher loop. The fetcher will stop running only when cancel() or
- // close() is called, or an error is thrown by threads created by the fetcher
- fetcher.runFetcher();
-
- // check that the fetcher has terminated before fully closing
- fetcher.awaitTermination();
- sourceContext.close();
- }
-
- @Override
- public void cancel() {
- running = false;
-
- KinesisDataFetcher fetcher = this.fetcher;
- this.fetcher = null;
-
- // this method might be called before the subtask actually starts running,
- // so we must check if the fetcher is actually created
- if (fetcher != null) {
- try {
- // interrupt the fetcher of any work
- fetcher.shutdownFetcher();
- fetcher.awaitTermination();
- } catch (Exception e) {
- LOG.warn("Error while closing Kinesis data fetcher", e);
- }
- }
- }
-
- @Override
- public void close() throws Exception {
- cancel();
- super.close();
- }
-
- @Override
- public TypeInformation<T> getProducedType() {
- return deserializer.getProducedType();
- }
-
- // ------------------------------------------------------------------------
- // State Snapshot & Restore
- // ------------------------------------------------------------------------
-
- @Override
- public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- if (lastStateSnapshot == null) {
- LOG.debug("snapshotState() requested on not yet opened source; returning null.");
- return null;
- }
-
- if (fetcher == null) {
- LOG.debug("snapshotState() requested on not yet running source; returning null.");
- return null;
- }
-
- if (!running) {
- LOG.debug("snapshotState() called on closed source; returning null.");
- return null;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Snapshotting state ...");
- }
-
- lastStateSnapshot = fetcher.snapshotState();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
- lastStateSnapshot.toString(), checkpointId, checkpointTimestamp);
- }
-
- return lastStateSnapshot;
- }
-
- @Override
- public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception {
- sequenceNumsToRestore = restoredState;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
deleted file mode 100644
index 579bd6b..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kinesis;
-
-import com.amazonaws.services.kinesis.producer.Attempt;
-import com.amazonaws.services.kinesis.producer.KinesisProducer;
-import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
-import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
-import com.amazonaws.services.kinesis.producer.UserRecordResult;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
-import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.PropertiesUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Objects;
-import java.util.Properties;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis.
- *
- * @param <OUT> Data type to produce into Kinesis Streams
- */
-public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class);
-
- /** Properties to parametrize settings such as AWS service region, access key etc. */
- private final Properties configProps;
-
- /* Flag controlling the error behavior of the producer */
- private boolean failOnError = false;
-
- /* Name of the default stream to produce to. Can be overwritten by the serialization schema */
- private String defaultStream;
-
- /* Default partition id. Can be overwritten by the serialization schema */
- private String defaultPartition;
-
- /* Schema for turning the OUT type into a byte array. */
- private final KinesisSerializationSchema<OUT> schema;
-
- /* Optional custom partitioner */
- private KinesisPartitioner<OUT> customPartitioner = null;
-
-
- // --------------------------- Runtime fields ---------------------------
-
-
- /* Our Kinesis instance for each parallel Flink sink */
- private transient KinesisProducer producer;
-
- /* Callback handling failures */
- private transient FutureCallback<UserRecordResult> callback;
-
- /* Field for async exception */
- private transient volatile Throwable thrownException;
-
-
- // --------------------------- Initialization and configuration ---------------------------
-
-
- /**
- * Create a new FlinkKinesisProducer.
- * This is a constructor supporting Flink's {@see SerializationSchema}.
- *
- * @param schema Serialization schema for the data type
- * @param configProps The properties used to configure AWS credentials and AWS region
- */
- public FlinkKinesisProducer(final SerializationSchema<OUT> schema, Properties configProps) {
-
- // create a simple wrapper for the serialization schema
- this(new KinesisSerializationSchema<OUT>() {
- @Override
- public ByteBuffer serialize(OUT element) {
- // wrap into ByteBuffer
- return ByteBuffer.wrap(schema.serialize(element));
- }
- // use default stream and hash key
- @Override
- public String getTargetStream(OUT element) {
- return null;
- }
- }, configProps);
- }
-
- /**
- * Create a new FlinkKinesisProducer.
- * This is a constructor supporting {@see KinesisSerializationSchema}.
- *
- * @param schema Kinesis serialization schema for the data type
- * @param configProps The properties used to configure AWS credentials and AWS region
- */
- public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps) {
- this.configProps = checkNotNull(configProps, "configProps can not be null");
-
- // check the configuration properties for any conflicting settings
- KinesisConfigUtil.validateProducerConfiguration(this.configProps);
-
- ClosureCleaner.ensureSerializable(Objects.requireNonNull(schema));
- this.schema = schema;
- }
-
- /**
- * If set to true, the producer will immediately fail with an exception on any error.
- * Otherwise, the errors are logged and the producer goes on.
- *
- * @param failOnError Error behavior flag
- */
- public void setFailOnError(boolean failOnError) {
- this.failOnError = failOnError;
- }
-
- /**
- * Set a default stream name.
- * @param defaultStream Name of the default Kinesis stream
- */
- public void setDefaultStream(String defaultStream) {
- this.defaultStream = defaultStream;
- }
-
- /**
- * Set default partition id
- * @param defaultPartition Name of the default partition
- */
- public void setDefaultPartition(String defaultPartition) {
- this.defaultPartition = defaultPartition;
- }
-
- public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner) {
- Objects.requireNonNull(partitioner);
- ClosureCleaner.ensureSerializable(partitioner);
- this.customPartitioner = partitioner;
- }
-
-
- // --------------------------- Lifecycle methods ---------------------------
-
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration();
-
- producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
- producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
- if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
- producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps,
- ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG));
- }
- if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
- producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps,
- ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG));
- }
-
- producer = new KinesisProducer(producerConfig);
- callback = new FutureCallback<UserRecordResult>() {
- @Override
- public void onSuccess(UserRecordResult result) {
- if (!result.isSuccessful()) {
- if(failOnError) {
- thrownException = new RuntimeException("Record was not sent successful");
- } else {
- LOG.warn("Record was not sent successful");
- }
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (failOnError) {
- thrownException = t;
- } else {
- LOG.warn("An exception occurred while processing a record", t);
- }
- }
- };
-
- if (this.customPartitioner != null) {
- this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
- }
-
- LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion());
- }
-
- @Override
- public void invoke(OUT value) throws Exception {
- if (this.producer == null) {
- throw new RuntimeException("Kinesis producer has been closed");
- }
- if (thrownException != null) {
- String errorMessages = "";
- if (thrownException instanceof UserRecordFailedException) {
- List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts();
- for (Attempt attempt: attempts) {
- if (attempt.getErrorMessage() != null) {
- errorMessages += attempt.getErrorMessage() +"\n";
- }
- }
- }
- if (failOnError) {
- throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException);
- } else {
- LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages);
- thrownException = null; // reset
- }
- }
-
- String stream = defaultStream;
- String partition = defaultPartition;
-
- ByteBuffer serialized = schema.serialize(value);
-
- // maybe set custom stream
- String customStream = schema.getTargetStream(value);
- if (customStream != null) {
- stream = customStream;
- }
-
- String explicitHashkey = null;
- // maybe set custom partition
- if (customPartitioner != null) {
- partition = customPartitioner.getPartitionId(value);
- explicitHashkey = customPartitioner.getExplicitHashKey(value);
- }
-
- if (stream == null) {
- if (failOnError) {
- throw new RuntimeException("No target stream set");
- } else {
- LOG.warn("No target stream set. Skipping record");
- return;
- }
- }
-
- ListenableFuture<UserRecordResult> cb = producer.addUserRecord(stream, partition, explicitHashkey, serialized);
- Futures.addCallback(cb, callback);
- }
-
- @Override
- public void close() throws Exception {
- LOG.info("Closing producer");
- super.close();
- KinesisProducer kp = this.producer;
- this.producer = null;
- if (kp != null) {
- LOG.info("Flushing outstanding {} records", kp.getOutstandingRecordsCount());
- // try to flush all outstanding records
- while (kp.getOutstandingRecordsCount() > 0) {
- kp.flush();
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- LOG.warn("Flushing was interrupted.");
- // stop the blocking flushing and destroy producer immediately
- break;
- }
- }
- LOG.info("Flushing done. Destroying producer instance.");
- kp.destroy();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
deleted file mode 100644
index bd23abe..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kinesis;
-
-
-import java.io.Serializable;
-
-public abstract class KinesisPartitioner<T> implements Serializable {
-
- /**
- * Return a partition id based on the input
- * @param element Element to partition
- * @return A string representing the partition id
- */
- public abstract String getPartitionId(T element);
-
- /**
- * Optional method for setting an explicit hash key
- * @param element Element to get the hash key for
- * @return the hash key for the element
- */
- public String getExplicitHashKey(T element) {
- return null;
- }
-
- /**
- * Optional initializer.
- *
- * @param indexOfThisSubtask Index of this partitioner instance
- * @param numberOfParallelSubtasks Total number of parallel instances
- */
- public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) {
- //
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
deleted file mode 100644
index 01d4f00..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.config;
-
-import com.amazonaws.auth.AWSCredentialsProvider;
-
-/**
- * Configuration keys for AWS service usage
- */
-public class AWSConfigConstants {
-
- /**
- * Possible configuration values for the type of credential provider to use when accessing AWS Kinesis.
- * Internally, a corresponding implementation of {@link AWSCredentialsProvider} will be used.
- */
- public enum CredentialProvider {
-
- /** Look for the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to create AWS credentials */
- ENV_VAR,
-
- /** Look for Java system properties aws.accessKeyId and aws.secretKey to create AWS credentials */
- SYS_PROP,
-
- /** Use a AWS credentials profile file to create the AWS credentials */
- PROFILE,
-
- /** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties */
- BASIC,
-
- /** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata **/
- AUTO,
- }
-
- /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set) */
- public static final String AWS_REGION = "aws.region";
-
- /** The AWS access key ID to use when setting credentials provider type to BASIC */
- public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid";
-
- /** The AWS secret key to use when setting credentials provider type to BASIC */
- public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey";
-
- /** The credential provider type to use when AWS credentials are required (BASIC is used if not set)*/
- public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
-
- /** Optional configuration for profile path if credential provider type is set to be PROFILE */
- public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path";
-
- /** Optional configuration for profile name if credential provider type is set to be PROFILE */
- public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name";
-
- /** The AWS endpoint for Kinesis (derived from the AWS region setting if not set) */
- public static final String AWS_ENDPOINT = "aws.endpoint";
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
deleted file mode 100644
index 76c20ed..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.config;
-
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
-import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
-
-/**
- * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer}
- */
-public class ConsumerConfigConstants extends AWSConfigConstants {
-
- /**
- * The initial position to start reading shards from. This will affect the {@link ShardIteratorType} used
- * when the consumer tasks retrieve the first shard iterator for each Kinesis shard.
- */
- public enum InitialPosition {
-
- /** Start reading from the earliest possible record in the stream (excluding expired data records) */
- TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM),
-
- /** Start reading from the latest incoming record */
- LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM);
-
- private SentinelSequenceNumber sentinelSequenceNumber;
-
- InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) {
- this.sentinelSequenceNumber = sentinelSequenceNumber;
- }
-
- public SentinelSequenceNumber toSentinelSequenceNumber() {
- return this.sentinelSequenceNumber;
- }
- }
-
- /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */
- public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
-
- /** The base backoff time between each describeStream attempt */
- public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
-
- /** The maximum backoff time between each describeStream attempt */
- public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max";
-
- /** The power constant for exponential backoff between each describeStream attempt */
- public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst";
-
- /** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard */
- public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
-
- /** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException */
- public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries";
-
- /** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */
- public static final String SHARD_GETRECORDS_BACKOFF_BASE = "flink.shard.getrecords.backoff.base";
-
- /** The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */
- public static final String SHARD_GETRECORDS_BACKOFF_MAX = "flink.shard.getrecords.backoff.max";
-
- /** The power constant for exponential backoff between each getRecords attempt */
- public static final String SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getrecords.backoff.expconst";
-
- /** The interval between each getRecords request to a AWS Kinesis shard in milliseconds */
- public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis";
-
- /** The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException */
- public static final String SHARD_GETITERATOR_RETRIES = "flink.shard.getiterator.maxretries";
-
- /** The base backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */
- public static final String SHARD_GETITERATOR_BACKOFF_BASE = "flink.shard.getiterator.backoff.base";
-
- /** The maximum backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */
- public static final String SHARD_GETITERATOR_BACKOFF_MAX = "flink.shard.getiterator.backoff.max";
-
- /** The power constant for exponential backoff between each getShardIterator attempt */
- public static final String SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getiterator.backoff.expconst";
-
- /** The interval between each attempt to discover new shards */
- public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis";
-
- // ------------------------------------------------------------------------
- // Default values for consumer configuration
- // ------------------------------------------------------------------------
-
- public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString();
-
- public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;
-
- public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;
-
- public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
-
- public static final int DEFAULT_SHARD_GETRECORDS_MAX = 100;
-
- public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
-
- public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE = 300L;
-
- public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX = 1000L;
-
- public static final double DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
-
- public static final long DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS = 0L;
-
- public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3;
-
- public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE = 300L;
-
- public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX = 1000L;
-
- public static final double DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
-
- public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 10000L;
-
- /**
- * To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured
- * getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators.
- */
- public static final long MAX_SHARD_GETRECORDS_INTERVAL_MILLIS = 300000L;
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
deleted file mode 100644
index 1edddfc..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.config;
-
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
-
-/**
- * Optional producer specific configuration keys for {@link FlinkKinesisProducer}
- */
-public class ProducerConfigConstants extends AWSConfigConstants {
-
- /** Maximum number of items to pack into an PutRecords request. **/
- public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount";
-
- /** Maximum number of items to pack into an aggregated record. **/
- public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount";
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
deleted file mode 100644
index 55668c6..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kinesis.examples;
-
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-import java.util.Properties;
-
-/**
- * This is an example on how to consume data from Kinesis
- */
-public class ConsumeFromKinesis {
-
- public static void main(String[] args) throws Exception {
- ParameterTool pt = ParameterTool.fromArgs(args);
-
- StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
- see.setParallelism(1);
-
- Properties kinesisConsumerConfig = new Properties();
- kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, pt.getRequired("region"));
- kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accesskey"));
- kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretkey"));
-
- DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>(
- "flink-test",
- new SimpleStringSchema(),
- kinesisConsumerConfig));
-
- kinesis.print();
-
- see.execute();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
deleted file mode 100644
index d178137..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kinesis.examples;
-
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
-import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-import java.util.Properties;
-
-/**
- * This is an example on how to produce data into Kinesis
- */
-public class ProduceIntoKinesis {
-
- public static void main(String[] args) throws Exception {
- ParameterTool pt = ParameterTool.fromArgs(args);
-
- StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
- see.setParallelism(1);
-
- DataStream<String> simpleStringStream = see.addSource(new EventsGenerator());
-
- Properties kinesisProducerConfig = new Properties();
- kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region"));
- kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
- kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
-
- FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
- new SimpleStringSchema(), kinesisProducerConfig);
-
- kinesis.setFailOnError(true);
- kinesis.setDefaultStream("flink-test");
- kinesis.setDefaultPartition("0");
-
- simpleStringStream.addSink(kinesis);
-
- see.execute();
- }
-
- public static class EventsGenerator implements SourceFunction<String> {
- private boolean running = true;
-
- @Override
- public void run(SourceContext<String> ctx) throws Exception {
- long seq = 0;
- while(running) {
- Thread.sleep(10);
- ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- }
-}