You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2019/09/23 10:17:23 UTC

[flink-playgrounds] branch release-1.9 updated (b575647 -> 5b93147)

This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git.


    from b575647  [hotfix] Update URL in ops playground README.md to Flink 1.9 docs.
     new 41acc3b  [FLINK-14160] Add --backpressure option to the ClickEventCount job in the operations playground
     new 5b93147  [hotfix] Improve .gitignore

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitignore                                         |  6 +--
 .../java/flink-playground-clickcountjob/pom.xml    |  2 +-
 .../ops/clickcount/ClickEventCount.java            | 25 ++++++++++--
 .../ops/clickcount/functions/BackpressureMap.java  | 46 ++++++++++++++++++++++
 operations-playground/docker-compose.yaml          |  4 +-
 5 files changed, 74 insertions(+), 9 deletions(-)
 create mode 100644 docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java


[flink-playgrounds] 01/02: [FLINK-14160] Add --backpressure option to the ClickEventCount job in the operations playground

Posted by fh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git

commit 41acc3b90bbf43e6879f2e3d9cdded0cac980524
Author: David Anderson <da...@alpinegizmo.com>
AuthorDate: Thu Sep 19 20:08:58 2019 +0200

    [FLINK-14160] Add --backpressure option to the ClickEventCount job in the operations playground
    
    This closes #4.
---
 .../java/flink-playground-clickcountjob/pom.xml    |  2 +-
 .../ops/clickcount/ClickEventCount.java            | 25 ++++++++++--
 .../ops/clickcount/functions/BackpressureMap.java  | 46 ++++++++++++++++++++++
 operations-playground/docker-compose.yaml          |  4 +-
 4 files changed, 71 insertions(+), 6 deletions(-)

diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
index 3d17fcd..893c11e 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
@@ -22,7 +22,7 @@ under the License.
 
 	<groupId>org.apache.flink</groupId>
 	<artifactId>flink-playground-clickcountjob</artifactId>
-	<version>1-FLINK-1.9_2.11</version>
+	<version>2-FLINK-1.9_2.11</version>
 
 	<name>flink-playground-clickcountjob</name>
 	<packaging>jar</packaging>
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
index 0316bc6..f3d628c 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -18,6 +18,7 @@
 package org.apache.flink.playgrounds.ops.clickcount;
 
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.playgrounds.ops.clickcount.functions.BackpressureMap;
 import org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector;
 import org.apache.flink.playgrounds.ops.clickcount.functions.CountingAggregator;
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
@@ -25,6 +26,7 @@ import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventDeserializa
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics;
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatisticsSerializationSchema;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -47,6 +49,7 @@ import java.util.concurrent.TimeUnit;
  * <p>The Job can be configured via the command line:</p>
  * * "--checkpointing": enables checkpointing
  * * "--event-time": set the StreamTimeCharacteristic to EventTime
+ * * "--backpressure": insert an operator that causes periodic backpressure
  * * "--input-topic": the name of the Kafka Topic to consume {@link ClickEvent}s from
  * * "--output-topic": the name of the Kafka Topic to produce {@link ClickEventStatistics} to
  * * "--bootstrap.servers": comma-separated list of Kafka brokers
@@ -56,6 +59,7 @@ public class ClickEventCount {
 
 	public static final String CHECKPOINTING_OPTION = "checkpointing";
 	public static final String EVENT_TIME_OPTION = "event-time";
+	public static final String BACKPRESSURE_OPTION = "backpressure";
 
 	public static final Time WINDOW_SIZE = Time.of(15, TimeUnit.SECONDS);
 
@@ -66,6 +70,8 @@ public class ClickEventCount {
 
 		configureEnvironment(params, env);
 
+		boolean inflictBackpressure = params.has(BACKPRESSURE_OPTION);
+
 		String inputTopic = params.get("input-topic", "input");
 		String outputTopic = params.get("output-topic", "output");
 		String brokers = params.get("bootstrap.servers", "localhost:9092");
@@ -73,19 +79,32 @@ public class ClickEventCount {
 		kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
 		kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count");
 
-		env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps))
+		DataStream<ClickEvent> clicks =
+				env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps))
 			.name("ClickEvent Source")
 			.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
 				@Override
 				public long extractTimestamp(final ClickEvent element) {
 					return element.getTimestamp().getTime();
 				}
-			})
+			});
+
+		if (inflictBackpressure) {
+			// Force a network shuffle so that the backpressure will affect the buffer pools
+			clicks = clicks
+				.keyBy(ClickEvent::getPage)
+				.map(new BackpressureMap())
+				.name("Backpressure");
+		}
+
+		DataStream<ClickEventStatistics> statistics = clicks
 			.keyBy(ClickEvent::getPage)
 			.timeWindow(WINDOW_SIZE)
 			.aggregate(new CountingAggregator(),
 				new ClickEventStatisticsCollector())
-			.name("ClickEvent Counter")
+			.name("ClickEvent Counter");
+
+		statistics
 			.addSink(new FlinkKafkaProducer<>(
 				outputTopic,
 				new ClickEventStatisticsSerializationSchema(outputTopic),
diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java
new file mode 100644
index 0000000..ee68573
--- /dev/null
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.playgrounds.ops.clickcount.functions;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
+
+import java.time.LocalTime;
+
+/**
+ * This MapFunction causes severe backpressure during even-numbered minutes.
+ * E.g., from 10:12:00 to 10:12:59 it will only process 10 events/sec,
+ * but from 10:13:00 to 10:13:59 events will pass through unimpeded.
+ */
+public class BackpressureMap implements MapFunction<ClickEvent, ClickEvent> {
+
+	private boolean causeBackpressure() {
+		return ((LocalTime.now().getMinute() % 2) == 0);
+	}
+
+	@Override
+	public ClickEvent map(ClickEvent event) throws Exception {
+		if (causeBackpressure()) {
+			Thread.sleep(100);
+		}
+
+		return event;
+	}
+
+}
diff --git a/operations-playground/docker-compose.yaml b/operations-playground/docker-compose.yaml
index 9ed71c5..7907092 100644
--- a/operations-playground/docker-compose.yaml
+++ b/operations-playground/docker-compose.yaml
@@ -20,7 +20,7 @@ version: "2.1"
 services:
   client:
     build: ../docker/ops-playground-image
-    image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
+    image: apache/flink-ops-playground:2-FLINK-1.9-scala_2.11
     command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time"
     depends_on:
       - jobmanager
@@ -30,7 +30,7 @@ services:
     environment:
       - JOB_MANAGER_RPC_ADDRESS=jobmanager
   clickevent-generator:
-    image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
+    image: apache/flink-ops-playground:2-FLINK-1.9-scala_2.11
     command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input"
     depends_on:
       - kafka


[flink-playgrounds] 02/02: [hotfix] Improve .gitignore

Posted by fh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git

commit 5b93147d2fc050a5ce9597dcc8c478c1b9ed08c4
Author: Fabian Hueske <fh...@apache.org>
AuthorDate: Mon Sep 23 12:03:20 2019 +0200

    [hotfix] Improve .gitignore
---
 .gitignore | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/.gitignore b/.gitignore
index d4e4d76..d04cff5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,3 @@
-*/.idea
-*/target
-*/dependency-reduced-pom.xml
+**/.idea
+**/target
+**/dependency-reduced-pom.xml