You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2019/09/23 10:16:10 UTC

[flink-playgrounds] branch master updated (5d636ae -> 00db5d0)

This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git.


    from 5d636ae  [hotfix] Update URL in ops playground README.md to Flink master docs.
     new 1c7c254  [FLINK-14160] Add --backpressure option to the ClickEventCount job in the operations playground
     new 00db5d0  [hotfix] Improve .gitignore

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitignore                                         |  6 +--
 .../java/flink-playground-clickcountjob/pom.xml    |  2 +-
 .../ops/clickcount/ClickEventCount.java            | 25 ++++++++++--
 .../ops/clickcount/functions/BackpressureMap.java  | 46 ++++++++++++++++++++++
 operations-playground/docker-compose.yaml          |  4 +-
 5 files changed, 74 insertions(+), 9 deletions(-)
 create mode 100644 docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java


[flink-playgrounds] 02/02: [hotfix] Improve .gitignore

Posted by fh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git

commit 00db5d0904ca1a023eb9612b12eccd25961f31a9
Author: Fabian Hueske <fh...@apache.org>
AuthorDate: Mon Sep 23 12:03:20 2019 +0200

    [hotfix] Improve .gitignore
---
 .gitignore | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/.gitignore b/.gitignore
index d4e4d76..d04cff5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,3 @@
-*/.idea
-*/target
-*/dependency-reduced-pom.xml
+**/.idea
+**/target
+**/dependency-reduced-pom.xml


[flink-playgrounds] 01/02: [FLINK-14160] Add --backpressure option to the ClickEventCount job in the operations playground

Posted by fh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git

commit 1c7c254fc7827e74db7c3c387348e7ca2219788a
Author: David Anderson <da...@alpinegizmo.com>
AuthorDate: Thu Sep 19 20:08:58 2019 +0200

    [FLINK-14160] Add --backpressure option to the ClickEventCount job in the operations playground
    
    This closes #4.
---
 .../java/flink-playground-clickcountjob/pom.xml    |  2 +-
 .../ops/clickcount/ClickEventCount.java            | 25 ++++++++++--
 .../ops/clickcount/functions/BackpressureMap.java  | 46 ++++++++++++++++++++++
 operations-playground/docker-compose.yaml          |  4 +-
 4 files changed, 71 insertions(+), 6 deletions(-)

diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
index 3d17fcd..893c11e 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
@@ -22,7 +22,7 @@ under the License.
 
 	<groupId>org.apache.flink</groupId>
 	<artifactId>flink-playground-clickcountjob</artifactId>
-	<version>1-FLINK-1.9_2.11</version>
+	<version>2-FLINK-1.9_2.11</version>
 
 	<name>flink-playground-clickcountjob</name>
 	<packaging>jar</packaging>
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
index 0316bc6..f3d628c 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -18,6 +18,7 @@
 package org.apache.flink.playgrounds.ops.clickcount;
 
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.playgrounds.ops.clickcount.functions.BackpressureMap;
 import org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector;
 import org.apache.flink.playgrounds.ops.clickcount.functions.CountingAggregator;
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
@@ -25,6 +26,7 @@ import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventDeserializa
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics;
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatisticsSerializationSchema;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -47,6 +49,7 @@ import java.util.concurrent.TimeUnit;
  * <p>The Job can be configured via the command line:</p>
  * * "--checkpointing": enables checkpointing
  * * "--event-time": set the StreamTimeCharacteristic to EventTime
+ * * "--backpressure": insert an operator that causes periodic backpressure
  * * "--input-topic": the name of the Kafka Topic to consume {@link ClickEvent}s from
  * * "--output-topic": the name of the Kafka Topic to produce {@link ClickEventStatistics} to
  * * "--bootstrap.servers": comma-separated list of Kafka brokers
@@ -56,6 +59,7 @@ public class ClickEventCount {
 
 	public static final String CHECKPOINTING_OPTION = "checkpointing";
 	public static final String EVENT_TIME_OPTION = "event-time";
+	public static final String BACKPRESSURE_OPTION = "backpressure";
 
 	public static final Time WINDOW_SIZE = Time.of(15, TimeUnit.SECONDS);
 
@@ -66,6 +70,8 @@ public class ClickEventCount {
 
 		configureEnvironment(params, env);
 
+		boolean inflictBackpressure = params.has(BACKPRESSURE_OPTION);
+
 		String inputTopic = params.get("input-topic", "input");
 		String outputTopic = params.get("output-topic", "output");
 		String brokers = params.get("bootstrap.servers", "localhost:9092");
@@ -73,19 +79,32 @@ public class ClickEventCount {
 		kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
 		kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count");
 
-		env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps))
+		DataStream<ClickEvent> clicks =
+				env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps))
 			.name("ClickEvent Source")
 			.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
 				@Override
 				public long extractTimestamp(final ClickEvent element) {
 					return element.getTimestamp().getTime();
 				}
-			})
+			});
+
+		if (inflictBackpressure) {
+			// Force a network shuffle so that the backpressure will affect the buffer pools
+			clicks = clicks
+				.keyBy(ClickEvent::getPage)
+				.map(new BackpressureMap())
+				.name("Backpressure");
+		}
+
+		DataStream<ClickEventStatistics> statistics = clicks
 			.keyBy(ClickEvent::getPage)
 			.timeWindow(WINDOW_SIZE)
 			.aggregate(new CountingAggregator(),
 				new ClickEventStatisticsCollector())
-			.name("ClickEvent Counter")
+			.name("ClickEvent Counter");
+
+		statistics
 			.addSink(new FlinkKafkaProducer<>(
 				outputTopic,
 				new ClickEventStatisticsSerializationSchema(outputTopic),
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java
new file mode 100644
index 0000000..ee68573
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.functions;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
+
+import java.time.LocalTime;
+
+/**
+ * This MapFunction causes severe backpressure during even-numbered minutes.
+ * E.g., from 10:12:00 to 10:12:59 it will only process 10 events/sec,
+ * but from 10:13:00 to 10:13:59 events will pass through unimpeded.
+ */
+public class BackpressureMap implements MapFunction<ClickEvent, ClickEvent> {
+
+	private boolean causeBackpressure() {
+		return ((LocalTime.now().getMinute() % 2) == 0);
+	}
+
+	@Override
+	public ClickEvent map(ClickEvent event) throws Exception {
+		if (causeBackpressure()) {
+			Thread.sleep(100);
+		}
+
+		return event;
+	}
+
+}
diff --git a/operations-playground/docker-compose.yaml b/operations-playground/docker-compose.yaml
index 9ed71c5..7907092 100644
--- a/operations-playground/docker-compose.yaml
+++ b/operations-playground/docker-compose.yaml
@@ -20,7 +20,7 @@ version: "2.1"
 services:
   client:
     build: ../docker/ops-playground-image
-    image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
+    image: apache/flink-ops-playground:2-FLINK-1.9-scala_2.11
     command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time"
     depends_on:
       - jobmanager
@@ -30,7 +30,7 @@ services:
     environment:
       - JOB_MANAGER_RPC_ADDRESS=jobmanager
   clickevent-generator:
-    image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
+    image: apache/flink-ops-playground:2-FLINK-1.9-scala_2.11
     command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input"
     depends_on:
       - kafka