You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2020/07/09 12:50:43 UTC

[flink-training] branch master updated: [FLINK-18499] Update for Flink 1.11: don’t use deprecated flavors of keyBy

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git


The following commit(s) were added to refs/heads/master by this push:
     new 298c734  [FLINK-18499] Update for Flink 1.11: don’t use deprecated flavors of keyBy
298c734 is described below

commit 298c734093740a00b8b141c4dfe163be58bcfe71
Author: David Anderson <da...@alpinegizmo.com>
AuthorDate: Wed Jul 8 17:25:44 2020 +0200

    [FLINK-18499] Update for Flink 1.11: don’t use deprecated flavors of keyBy
    
    This closes #12
---
 build.gradle                                                          | 4 ++--
 .../apache/flink/training/examples/ridecount/RideCountExample.java    | 3 +--
 hourly-tips/DISCUSSION.md                                             | 2 +-
 .../flink/training/solutions/hourlytips/HourlyTipsSolution.java       | 2 +-
 .../apache/flink/training/exercises/longrides/LongRidesExercise.java  | 2 +-
 .../apache/flink/training/solutions/longrides/LongRidesSolution.java  | 2 +-
 .../flink/training/exercises/ridesandfares/RidesAndFaresExercise.java | 4 ++--
 .../exercises/ridesandfares/scala/RidesAndFaresExercise.scala         | 4 ++--
 .../flink/training/solutions/ridesandfares/RidesAndFaresSolution.java | 4 ++--
 .../solutions/ridesandfares/scala/RidesAndFaresSolution.scala         | 4 ++--
 10 files changed, 15 insertions(+), 16 deletions(-)

diff --git a/build.gradle b/build.gradle
index 5aae798..ced3f7f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -31,12 +31,12 @@ subprojects {
 
     // artifact properties
     group = 'org.apache.flink'
-    version = '1.10-SNAPSHOT'
+    version = '1.11-SNAPSHOT'
     description = """Flink Training Exercises"""
 
     ext {
         javaVersion = '1.8'
-        flinkVersion = '1.10.0'
+        flinkVersion = '1.11.0'
         scalaBinaryVersion = '2.12'
         slf4jVersion = '1.7.15'
         log4jVersion = '1.2.17'
diff --git a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
index ac7424a..35ed7f4 100644
--- a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
+++ b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
@@ -19,7 +19,6 @@
 package org.apache.flink.training.examples.ridecount;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -73,7 +72,7 @@ public class RideCountExample {
 		});
 
 		// partition the stream by the driverId
-		KeyedStream<Tuple2<Long, Long>, Tuple> keyedByDriverId = tuples.keyBy(0);
+		KeyedStream<Tuple2<Long, Long>, Long> keyedByDriverId = tuples.keyBy(t -> t.f0);
 
 		// count the rides for each driver
 		DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1);
diff --git a/hourly-tips/DISCUSSION.md b/hourly-tips/DISCUSSION.md
index e498f81..c1571c5 100644
--- a/hourly-tips/DISCUSSION.md
+++ b/hourly-tips/DISCUSSION.md
@@ -118,7 +118,7 @@ But, what if we were to do this, instead?
 
 ```java
 DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
-	.keyBy(0)
+	.keyBy(t -> t.f0)
 	.maxBy(2);
 ```
 
diff --git a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
index 54671f9..ed01320 100644
--- a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
+++ b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
@@ -82,7 +82,7 @@ public class HourlyTipsSolution extends ExerciseBase {
 //		and different from, the solution above (using a windowAll)?
 
 // 		DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
-// 			.keyBy(0)
+// 			.keyBy(t -> t.f0)
 // 			.maxBy(2);
 
 		printOrTest(hourlyMax);
diff --git a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
index 327ba38..ba48f50 100644
--- a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
+++ b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
@@ -67,7 +67,7 @@ public class LongRidesExercise extends ExerciseBase {
 		DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
 
 		DataStream<TaxiRide> longRides = rides
-				.keyBy(ride -> ride.rideId)
+				.keyBy((TaxiRide ride) -> ride.rideId)
 				.process(new MatchFunction());
 
 		printOrTest(longRides);
diff --git a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
index fb75ef2..37ef758 100644
--- a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
+++ b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
@@ -68,7 +68,7 @@ public class LongRidesSolution extends ExerciseBase {
 		DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
 
 		DataStream<TaxiRide> longRides = rides
-				.keyBy(r -> r.rideId)
+				.keyBy((TaxiRide ride) -> ride.rideId)
 				.process(new MatchFunction());
 
 		printOrTest(longRides);
diff --git a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
index 4bae96c..2817f19 100644
--- a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
+++ b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
@@ -68,11 +68,11 @@ public class RidesAndFaresExercise extends ExerciseBase {
 		DataStream<TaxiRide> rides = env
 				.addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
 				.filter((TaxiRide ride) -> ride.isStart)
-				.keyBy("rideId");
+				.keyBy((TaxiRide ride) -> ride.rideId);
 
 		DataStream<TaxiFare> fares = env
 				.addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
-				.keyBy("rideId");
+				.keyBy((TaxiFare fare) -> fare.rideId);
 
 		DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
 				.connect(fares)
diff --git a/rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala b/rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
index 36391d8..cd5b028 100644
--- a/rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
+++ b/rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
@@ -55,11 +55,11 @@ object RidesAndFaresExercise {
     val rides = env
       .addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
       .filter { ride => ride.isStart }
-      .keyBy("rideId")
+      .keyBy { ride => ride.rideId }
 
     val fares = env
       .addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
-      .keyBy("rideId")
+      .keyBy { fare => fare.rideId }
 
     val processed = rides
       .connect(fares)
diff --git a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
index 0254540..20fb255 100644
--- a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
+++ b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
@@ -81,11 +81,11 @@ public class RidesAndFaresSolution extends ExerciseBase {
 		DataStream<TaxiRide> rides = env
 				.addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
 				.filter((TaxiRide ride) -> ride.isStart)
-				.keyBy(ride -> ride.rideId);
+				.keyBy((TaxiRide ride) -> ride.rideId);
 
 		DataStream<TaxiFare> fares = env
 				.addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
-				.keyBy(fare -> fare.rideId);
+				.keyBy((TaxiFare fare) -> fare.rideId);
 
 		// Set a UID on the stateful flatmap operator so we can read its state using the State Processor API.
 		DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
diff --git a/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala b/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala
index f6d9334..b7add66 100644
--- a/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala
+++ b/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala
@@ -56,11 +56,11 @@ object RidesAndFaresSolution {
     val rides = env
       .addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
       .filter { ride => ride.isStart }
-      .keyBy("rideId")
+      .keyBy { ride => ride.rideId }
 
     val fares = env
       .addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
-      .keyBy("rideId")
+      .keyBy { fare => fare.rideId }
 
     val processed = rides
       .connect(fares)