You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2016/08/25 18:48:21 UTC

[18/89] [abbrv] [partial] flink git commit: [FLINK-4317, FLIP-3] [docs] Restructure docs
diff --git a/docs/dev/connectors/ b/docs/dev/connectors/
new file mode 100644
index 0000000..ce011b3
--- /dev/null
+++ b/docs/dev/connectors/
@@ -0,0 +1,319 @@
+title: "Amazon AWS Kinesis Streams Connector"
+nav-title: Kinesis
+nav-parent_id: connectors
+nav-pos: 3
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+The Kinesis connector provides access to [Amazon AWS Kinesis Streams](
+To use the connector, add the following Maven dependency to your project:
+{% highlight xml %}
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kinesis{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+{% endhighlight %}
+**The `flink-connector-kinesis{{ site.scala_version_suffix }}` has a dependency on code licensed under the [Amazon Software License]( (ASL).
+Linking to the flink-connector-kinesis will include ASL licensed code into your application.**
+The `flink-connector-kinesis{{ site.scala_version_suffix }}` artifact is not deployed to Maven central as part of
+Flink releases because of the licensing issue. Therefore, you need to build the connector yourself from the source.
+Download the Flink source or check it out from the git repository. Then, use the following Maven command to build the module:
+{% highlight bash %}
+mvn clean install -Pinclude-kinesis -DskipTests
+# In Maven 3.3 the shading of flink-dist doesn't work properly in one run, so we need to run mvn for flink-dist again.
+cd flink-dist
+mvn clean install -Pinclude-kinesis -DskipTests
+{% endhighlight %}
+The streaming connectors are not part of the binary distribution. See how to link with them for cluster
+execution [here]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+### Using the Amazon Kinesis Streams Service
+Follow the instructions from the [Amazon Kinesis Streams Developer Guide](
+to setup Kinesis streams. Make sure to create the appropriate IAM policy and user to read / write to the Kinesis streams.
+### Kinesis Consumer
+The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis
+streams within the same AWS service region, and can handle resharding of streams. Each subtask of the consumer is
+responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will
+change as shards are closed and created by Kinesis.
+Before consuming data from Kinesis streams, make sure that all streams are created with the status "ACTIVE" in the AWS dashboard.
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Properties consumerConfig = new Properties();
+consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();
+DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
+    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
+{% endhighlight %}
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val consumerConfig = new Properties();
+consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+val env = StreamExecutionEnvironment.getEnvironment
+val kinesis = env.addSource(new FlinkKinesisConsumer[String](
+    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
+{% endhighlight %}
+The above is a simple example of using the consumer. Configuration for the consumer is supplied with a `java.util.Properties`
+instance, the configuration keys for which can be found in `ConsumerConfigConstants`. The example
+demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". The AWS credentials are supplied using the basic method in which
+the AWS access key ID and secret access key are directly supplied in the configuration (other options are setting
+`ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, `PROFILE`, and `AUTO`). Also, data is being consumed
+from the newest position in the Kinesis stream (the other option will be setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION`
+to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible).
+Other optional configuration keys for the consumer can be found in `ConsumerConfigConstants`.
+#### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
+With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and
+periodically checkpoint each shard's progress. In case of a job failure, Flink will restore the streaming program to the
+state of the latest complete checkpoint and re-consume the records from Kinesis shards, starting from the progress that
+was stored in the checkpoint.
+The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.
+To use fault tolerant Kinesis Consumers, checkpointing of the topology needs to be enabled at the execution environment:
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+{% endhighlight %}
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+Also note that Flink can only restart the topology if enough processing slots are available to restart the topology.
+Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
+Flink on YARN supports automatic restart of lost YARN containers.
+#### Event Time for Consumed Records
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+{% endhighlight %}
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+{% endhighlight %}
+If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be
+Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html),
+or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so,
+it can be passed to the consumer in the following way:
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
+    "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig));
+kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
+{% endhighlight %}
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val kinesis = env.addSource(new FlinkKinesisConsumer[String](
+    "kinesis_stream_name", new SimpleStringSchema, kinesisConsumerConfig))
+kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner)
+{% endhighlight %}
+#### Threading Model
+The Flink Kinesis Consumer uses multiple threads for shard discovery and data consumption.
+For shard discovery, each parallel consumer subtask will have a single thread that constantly queries Kinesis for shard
+information even if the subtask initially did not have shards to read from when the consumer was started. In other words, if
+the consumer is run with a parallelism of 10, there will be a total of 10 threads constantly querying Kinesis regardless
+of the total amount of shards in the subscribed streams.
+For data consumption, a single thread will be created to consume each discovered shard. Threads will terminate when the
+shard it is responsible of consuming is closed as a result of stream resharding. In other words, there will always be
+one thread per open shard.
+#### Internally Used Kinesis APIs
+The Flink Kinesis Consumer uses the [AWS Java SDK]( internally to call Kinesis APIs
+for shard discovery and data consumption. Due to Amazon's [service limits for Kinesis Streams](
+on the APIs, the consumer will be competing with other non-Flink consuming applications that the user may be running.
+Below is a list of APIs called by the consumer with description of how the consumer uses the API, as well as information
+on how to deal with any errors or warnings that the Flink Kinesis Consumer may have due to these service limits.
+- *[DescribeStream](*: this is constantly called
+by a single thread in each parallel consumer subtask to discover any new shards as a result of stream resharding. By default,
+the consumer performs the shard discovery at an interval of 10 seconds, and will retry indefinitely until it gets a result
+from Kinesis. If this interferes with other non-Flink consuming applications, users can slow down the consumer of
+calling this API by setting a value for `ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS` in the supplied
+configuration properties. This sets the discovery interval to a different value. Note that this setting directly impacts
+the maximum delay of discovering a new shard and starting to consume it, as shards will not be discovered during the interval.
+- *[GetShardIterator](*: this is called
+only once when per shard consuming threads are started, and will retry if Kinesis complains that the transaction limit for the
+API has exceeded, up to a default of 3 attempts. Note that since the rate limit for this API is per shard (not per stream),
+the consumer itself should not exceed the limit. Usually, if this happens, users can either try to slow down any other
+non-Flink consuming applications of calling this API, or modify the retry behaviour of this API call in the consumer by
+setting keys prefixed by `ConsumerConfigConstants.SHARD_GETITERATOR_*` in the supplied configuration properties.
+- *[GetRecords](*: this is constantly called
+by per shard consuming threads to fetch records from Kinesis. When a shard has multiple concurrent consumers (when there
+are any other non-Flink consuming applications running), the per shard rate limit may be exceeded. By default, on each call
+of this API, the consumer will retry if Kinesis complains that the data size / transaction limit for the API has exceeded,
+up to a default of 3 attempts. Users can either try to slow down other non-Flink consuming applications, or adjust the throughput
+of the consumer by setting the `ConsumerConfigConstants.SHARD_GETRECORDS_MAX` and
+`ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS` keys in the supplied configuration properties. Setting the former
+adjusts the maximum number of records each consuming thread tries to fetch from shards on each call (default is 100), while
+the latter modifies the sleep interval between each fetch (there will be no sleep by default). The retry behaviour of the
+consumer when calling this API can also be modified by using the other keys prefixed by `ConsumerConfigConstants.SHARD_GETRECORDS_*`.
+### Kinesis Producer
+The `FlinkKinesisProducer` is used for putting data from a Flink stream into a Kinesis stream. Note that the producer is not participating in
+Flink's checkpointing and doesn't provide exactly-once processing guarantees.
+Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here]( and [here]( for more details).
+In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics.
+To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard.
+For the monitoring to work, the user accessing the stream needs access to the Cloud watch service.
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Properties producerConfig = new Properties();
+producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
+producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
+DataStream<String> simpleStringStream = ...;
+{% endhighlight %}
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val producerConfig = new Properties();
+producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
+producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig);
+val simpleStringStream = ...;
+{% endhighlight %}
+The above is a simple example of using the producer. Configuration for the producer with the mandatory configuration values is supplied with a `java.util.Properties`
+instance as described above for the consumer. The example demonstrates producing a single Kinesis stream in the AWS region "us-east-1".
+Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send the data to multiple streams. This is
+done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream.
+Otherwise, the returned stream name is used.
+Other optional configuration keys for the producer can be found in `ProducerConfigConstants`.
+### Using Non-AWS Kinesis Endpoints for Testing
+It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS Kinesis endpoint such as
+[Kinesalite](; this is especially useful when performing functional testing of a Flink
+application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property.
+To override the AWS endpoint, taking the producer for example, set the `ProducerConfigConstants.AWS_ENDPOINT` property in the
+Flink configuration, in addition to the `ProducerConfigConstants.AWS_REGION` required by Flink. Although the region is
+required, it will not be used to determine the AWS endpoint URL.
+The following example shows how one might supply the `ProducerConfigConstants.AWS_ENDPOINT` configuration property:
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Properties producerConfig = new Properties();
+producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
+producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
+{% endhighlight %}
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val producerConfig = new Properties();
+producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
+producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
+{% endhighlight %}
diff --git a/docs/dev/connectors/ b/docs/dev/connectors/
new file mode 100644
index 0000000..924a80b
--- /dev/null
+++ b/docs/dev/connectors/
@@ -0,0 +1,138 @@
+title: "Apache NiFi Connector"
+nav-title: NiFi
+nav-parent_id: connectors
+nav-pos: 8
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+This connector provides a Source and Sink that can read from and write to
+[Apache NiFi]( To use this connector, add the
+following dependency to your project:
+{% highlight xml %}
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-nifi{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+{% endhighlight %}
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+for information about how to package the program with the libraries for
+cluster execution.
+#### Installing Apache NiFi
+Instructions for setting up a Apache NiFi cluster can be found
+#### Apache NiFi Source
+The connector provides a Source for reading data from Apache NiFi to Apache Flink.
+The class `NiFiSource(\u2026)` provides 2 constructors for reading data from NiFi.
+- `NiFiSource(SiteToSiteConfig config)` - Constructs a `NiFiSource(\u2026)` given the client's SiteToSiteConfig and a
+     default wait time of 1000 ms.
+- `NiFiSource(SiteToSiteConfig config, long waitTimeMs)` - Constructs a `NiFiSource(\u2026)` given the client's
+     SiteToSiteConfig and the specified wait time (in milliseconds).
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
+        .url("http://localhost:8080/nifi")
+        .portName("Data for Flink")
+        .requestBatchCount(5)
+        .buildConfig();
+SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
+{% endhighlight %}
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
+val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
+       .url("http://localhost:8080/nifi")
+       .portName("Data for Flink")
+       .requestBatchCount(5)
+       .buildConfig()
+val nifiSource = new NiFiSource(clientConfig)       
+{% endhighlight %}       
+Here data is read from the Apache NiFi Output Port called "Data for Flink" which is part of Apache NiFi
+Site-to-site protocol configuration.
+#### Apache NiFi Sink
+The connector provides a Sink for writing data from Apache Flink to Apache NiFi.
+The class `NiFiSink(\u2026)` provides a constructor for instantiating a `NiFiSink`.
+- `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder<T>)` constructs a `NiFiSink(\u2026)` given the client's `SiteToSiteConfig` and a `NiFiDataPacketBuilder` that converts data from Flink to `NiFiDataPacket` to be ingested by NiFi.
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
+        .url("http://localhost:8080/nifi")
+        .portName("Data from Flink")
+        .requestBatchCount(5)
+        .buildConfig();
+SinkFunction<NiFiDataPacket> nifiSink = new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<T>() {...});
+{% endhighlight %}
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()
+val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
+       .url("http://localhost:8080/nifi")
+       .portName("Data from Flink")
+       .requestBatchCount(5)
+       .buildConfig()
+val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder<T>() {...})
+{% endhighlight %}       
+More information about [Apache NiFi]( Site-to-Site Protocol can be found [here](
diff --git a/docs/dev/connectors/ b/docs/dev/connectors/
new file mode 100644
index 0000000..02def40
--- /dev/null
+++ b/docs/dev/connectors/
@@ -0,0 +1,129 @@
+title: "RabbitMQ Connector"
+nav-title: RabbitMQ
+nav-parent_id: connectors
+nav-pos: 7
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+This connector provides access to data streams from [RabbitMQ]( To use this connector, add the following dependency to your project:
+{% highlight xml %}
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-rabbitmq{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+{% endhighlight %}
+Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution [here]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+#### Installing RabbitMQ
+Follow the instructions from the [RabbitMQ download page]( After the installation the server automatically starts, and the application connecting to RabbitMQ can be launched.
+#### RabbitMQ Source
+A class which provides an interface for receiving data from RabbitMQ.
+The followings have to be provided for the `RMQSource(\u2026)` constructor in order:
+- RMQConnectionConfig.
+- queueName: The RabbitMQ queue name.
+- usesCorrelationId: `true` when correlation ids should be used, `false` otherwise (default is `false`).
+- deserializationSchema: Deserialization schema to turn messages into Java objects.
+This source can be operated in three different modes:
+1. Exactly-once (when checkpointed) with RabbitMQ transactions and messages with
+    unique correlation IDs.
+2. At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism
+    (correlation id is not set).
+3. No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.
+Correlation ids are a RabbitMQ application feature. You have to set it in the message properties
+when injecting messages into RabbitMQ. If you set `usesCorrelationId` to true and do not supply
+unique correlation ids, the source will throw an exception (if the correlation id is null) or ignore
+messages with non-unique correlation ids. If you set `usesCorrelationId` to false, then you don't
+have to supply correlation ids.
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+DataStream<String> streamWithoutCorrelationIds = env
+	.addSource(new RMQSource<String>(connectionConfig, "hello", new SimpleStringSchema()))
+	.print
+DataStream<String> streamWithCorrelationIds = env
+	.addSource(new RMQSource<String>(connectionConfig, "hello", true, new SimpleStringSchema()))
+	.print
+{% endhighlight %}
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val connectionConfig = new RMQConnectionConfig.Builder()
+streamWithoutCorrelationIds = env
+    .addSource(new RMQSource[String](connectionConfig, "hello", new SimpleStringSchema))
+    .print
+streamWithCorrelationIds = env
+    .addSource(new RMQSource[String](connectionConfig, "hello", true, new SimpleStringSchema))
+    .print
+{% endhighlight %}
+#### RabbitMQ Sink
+A class providing an interface for sending data to RabbitMQ.
+The followings have to be provided for the `RMQSink(\u2026)` constructor in order:
+1. RMQConnectionConfig
+2. The queue name
+3. Serialization schema
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+stream.addSink(new RMQSink<String>(connectionConfig, "hello", new SimpleStringSchema()));
+{% endhighlight %}
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val connectionConfig = new RMQConnectionConfig.Builder()
+stream.addSink(new RMQSink[String](connectionConfig, "hello", new SimpleStringSchema))
+{% endhighlight %}
+More about RabbitMQ can be found [here](
diff --git a/docs/dev/connectors/ b/docs/dev/connectors/
new file mode 100644
index 0000000..a987b90
--- /dev/null
+++ b/docs/dev/connectors/
@@ -0,0 +1,174 @@
+title: "Redis Connector"
+nav-title: Redis
+nav-parent_id: connectors
+nav-pos: 8
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+This connector provides a Sink that can write to
+[Redis]( and also can publish data to [Redis PubSub]( To use this connector, add the
+following dependency to your project:
+{% highlight xml %}
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-redis{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+{% endhighlight %}
+Version Compatibility: This module is compatible with Redis 2.8.5.
+Note that the streaming connectors are currently not part of the binary distribution. You need to link them for cluster execution [explicitly]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+#### Installing Redis
+Follow the instructions from the [Redis download page](
+#### Redis Sink
+A class providing an interface for sending data to Redis.
+The sink can use three different methods for communicating with different type of Redis environments:
+1. Single Redis Server
+2. Redis Cluster
+3. Redis Sentinel
+This code shows how to create a sink that communicate to a single redis server:
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>>{
+    @Override
+    public RedisCommandDescription getCommandDescription() {
+        return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
+    }
+    @Override
+    public String getKeyFromData(Tuple2<String, String> data) {
+        return data.f0;
+    }
+    @Override
+    public String getValueFromData(Tuple2<String, String> data) {
+        return data.f1;
+    }
+FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("").build();
+DataStream<String> stream = ...;
+stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
+{% endhighlight %}
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class RedisExampleMapper extends RedisMapper[(String, String)]{
+  override def getCommandDescription: RedisCommandDescription = {
+    new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")
+  }
+  override def getKeyFromData(data: (String, String)): String = data._1
+  override def getValueFromData(data: (String, String)): String = data._2
+val conf = new FlinkJedisPoolConfig.Builder().setHost("").build()
+stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
+{% endhighlight %}
+This example code does the same, but for Redis Cluster:
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
+    .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();
+DataStream<String> stream = ...;
+stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
+{% endhighlight %}
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build()
+stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
+{% endhighlight %}
+This example shows when the Redis environment is with Sentinels:
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder()
+    .setMasterName("master").setSentinels(...).build();
+DataStream<String> stream = ...;
+stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
+{% endhighlight %}
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val conf = new FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build()
+stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
+{% endhighlight %}
+This section gives a description of all the available data types and what Redis command used for that.
+<table class="table table-bordered" style="width: 75%">
+    <thead>
+        <tr>
+          <th class="text-center" style="width: 20%">Data Type</th>
+          <th class="text-center" style="width: 25%">Redis Command [Sink]</th>
+          <th class="text-center" style="width: 25%">Redis Command [Source]</th>
+        </tr>
+      </thead>
+      <tbody>
+        <tr>
+            <td>HASH</td><td><a href="">HSET</a></td><td>--NA--</td>
+        </tr>
+        <tr>
+            <td>LIST</td><td>
+                <a href="">RPUSH</a>,
+                <a href="">LPUSH</a>
+            </td><td>--NA--</td>
+        </tr>
+        <tr>
+            <td>SET</td><td><a href="">SADD</a></td><td>--NA--</td>
+        </tr>
+        <tr>
+            <td>PUBSUB</td><td><a href="">PUBLISH</a></td><td>--NA--</td>
+        </tr>
+        <tr>
+            <td>STRING</td><td><a href="">SET</a></td><td>--NA--</td>
+        </tr>
+        <tr>
+            <td>HYPER_LOG_LOG</td><td><a href="">PFADD</a></td><td>--NA--</td>
+        </tr>
+        <tr>
+            <td>SORTED_SET</td><td><a href="">ZADD</a></td><td>--NA--</td>
+        </tr>                
+      </tbody>
+More about Redis can be found [here](
diff --git a/docs/dev/connectors/ b/docs/dev/connectors/
new file mode 100644
index 0000000..e92e51d
--- /dev/null
+++ b/docs/dev/connectors/
@@ -0,0 +1,85 @@
+title: "Twitter Connector"
+nav-title: Twitter
+nav-parent_id: connectors
+nav-pos: 9
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+The Twitter Streaming API provides access to the stream of tweets made available by Twitter.
+Flink Streaming comes with a built-in `TwitterSource` class for establishing a connection to this stream.
+To use this connector, add the following dependency to your project:
+{% highlight xml %}
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-twitter{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+{% endhighlight %}
+Note that the streaming connectors are currently not part of the binary distribution.
+See linking with them for cluster execution [here]({{site.baseurl}}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+#### Authentication
+In order to connect to the Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below.
+#### Acquiring the authentication information
+First of all, a Twitter account is needed. Sign up for free at [](
+or sign in at Twitter's [Application Management]( and register the application by
+clicking on the "Create New App" button. Fill out a form about your program and accept the Terms and Conditions.
+After selecting the application, the API key and API secret (called `twitter-source.consumerKey` and `twitter-source.consumerSecret` in `TwitterSource` respectively) are located on the "API Keys" tab.
+The necessary OAuth Access Token data (`twitter-source.token` and `twitter-source.tokenSecret` in `TwitterSource`) can be generated and acquired on the "Keys and Access Tokens" tab.
+Remember to keep these pieces of information secret and do not push them to public repositories.
+#### Usage
+In contrast to other connectors, the `TwitterSource` depends on no additional services. For example the following code should run gracefully:
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Properties props = new Properties();
+p.setProperty(TwitterSource.CONSUMER_KEY, "");
+p.setProperty(TwitterSource.CONSUMER_SECRET, "");
+p.setProperty(TwitterSource.TOKEN, "");
+p.setProperty(TwitterSource.TOKEN_SECRET, "");
+DataStream<String> streamSource = env.addSource(new TwitterSource(props));
+{% endhighlight %}
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val props = new Properties();
+p.setProperty(TwitterSource.CONSUMER_KEY, "");
+p.setProperty(TwitterSource.CONSUMER_SECRET, "");
+p.setProperty(TwitterSource.TOKEN, "");
+p.setProperty(TwitterSource.TOKEN_SECRET, "");
+DataStream<String> streamSource = env.addSource(new TwitterSource(props));
+{% endhighlight %}
+The `TwitterSource` emits strings containing a JSON object, representing a Tweet.
+The `TwitterExample` class in the `flink-examples-streaming` package shows a full example how to use the `TwitterSource`.
+By default, the `TwitterSource` uses the `StatusesSampleEndpoint`. This endpoint returns a random sample of Tweets.
+There is a `TwitterSource.EndpointInitializer` interface allowing users to provide a custom endpoint.