You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2020/07/09 12:50:43 UTC
[flink-training] branch master updated: [FLINK-18499] Update for Flink 1.11: don’t use deprecated flavors of keyBy
This is an automated email from the ASF dual-hosted git repository.
sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git
The following commit(s) were added to refs/heads/master by this push:
new 298c734 [FLINK-18499] Update for Flink 1.11: don’t use deprecated flavors of keyBy
298c734 is described below
commit 298c734093740a00b8b141c4dfe163be58bcfe71
Author: David Anderson <da...@alpinegizmo.com>
AuthorDate: Wed Jul 8 17:25:44 2020 +0200
[FLINK-18499] Update for Flink 1.11: don’t use deprecated flavors of keyBy
This closes #12
---
build.gradle | 4 ++--
.../apache/flink/training/examples/ridecount/RideCountExample.java | 3 +--
hourly-tips/DISCUSSION.md | 2 +-
.../flink/training/solutions/hourlytips/HourlyTipsSolution.java | 2 +-
.../apache/flink/training/exercises/longrides/LongRidesExercise.java | 2 +-
.../apache/flink/training/solutions/longrides/LongRidesSolution.java | 2 +-
.../flink/training/exercises/ridesandfares/RidesAndFaresExercise.java | 4 ++--
.../exercises/ridesandfares/scala/RidesAndFaresExercise.scala | 4 ++--
.../flink/training/solutions/ridesandfares/RidesAndFaresSolution.java | 4 ++--
.../solutions/ridesandfares/scala/RidesAndFaresSolution.scala | 4 ++--
10 files changed, 15 insertions(+), 16 deletions(-)
diff --git a/build.gradle b/build.gradle
index 5aae798..ced3f7f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -31,12 +31,12 @@ subprojects {
// artifact properties
group = 'org.apache.flink'
- version = '1.10-SNAPSHOT'
+ version = '1.11-SNAPSHOT'
description = """Flink Training Exercises"""
ext {
javaVersion = '1.8'
- flinkVersion = '1.10.0'
+ flinkVersion = '1.11.0'
scalaBinaryVersion = '2.12'
slf4jVersion = '1.7.15'
log4jVersion = '1.2.17'
diff --git a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
index ac7424a..35ed7f4 100644
--- a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
+++ b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
@@ -19,7 +19,6 @@
package org.apache.flink.training.examples.ridecount;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -73,7 +72,7 @@ public class RideCountExample {
});
// partition the stream by the driverId
- KeyedStream<Tuple2<Long, Long>, Tuple> keyedByDriverId = tuples.keyBy(0);
+ KeyedStream<Tuple2<Long, Long>, Long> keyedByDriverId = tuples.keyBy(t -> t.f0);
// count the rides for each driver
DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1);
diff --git a/hourly-tips/DISCUSSION.md b/hourly-tips/DISCUSSION.md
index e498f81..c1571c5 100644
--- a/hourly-tips/DISCUSSION.md
+++ b/hourly-tips/DISCUSSION.md
@@ -118,7 +118,7 @@ But, what if we were to do this, instead?
```java
DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
- .keyBy(0)
+ .keyBy(t -> t.f0)
.maxBy(2);
```
diff --git a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
index 54671f9..ed01320 100644
--- a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
+++ b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
@@ -82,7 +82,7 @@ public class HourlyTipsSolution extends ExerciseBase {
// and different from, the solution above (using a windowAll)?
// DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
-// .keyBy(0)
+// .keyBy(t -> t.f0)
// .maxBy(2);
printOrTest(hourlyMax);
diff --git a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
index 327ba38..ba48f50 100644
--- a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
+++ b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
@@ -67,7 +67,7 @@ public class LongRidesExercise extends ExerciseBase {
DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
DataStream<TaxiRide> longRides = rides
- .keyBy(ride -> ride.rideId)
+ .keyBy((TaxiRide ride) -> ride.rideId)
.process(new MatchFunction());
printOrTest(longRides);
diff --git a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
index fb75ef2..37ef758 100644
--- a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
+++ b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
@@ -68,7 +68,7 @@ public class LongRidesSolution extends ExerciseBase {
DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
DataStream<TaxiRide> longRides = rides
- .keyBy(r -> r.rideId)
+ .keyBy((TaxiRide ride) -> ride.rideId)
.process(new MatchFunction());
printOrTest(longRides);
diff --git a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
index 4bae96c..2817f19 100644
--- a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
+++ b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
@@ -68,11 +68,11 @@ public class RidesAndFaresExercise extends ExerciseBase {
DataStream<TaxiRide> rides = env
.addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
.filter((TaxiRide ride) -> ride.isStart)
- .keyBy("rideId");
+ .keyBy((TaxiRide ride) -> ride.rideId);
DataStream<TaxiFare> fares = env
.addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
- .keyBy("rideId");
+ .keyBy((TaxiFare fare) -> fare.rideId);
DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
.connect(fares)
diff --git a/rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala b/rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
index 36391d8..cd5b028 100644
--- a/rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
+++ b/rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
@@ -55,11 +55,11 @@ object RidesAndFaresExercise {
val rides = env
.addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
.filter { ride => ride.isStart }
- .keyBy("rideId")
+ .keyBy { ride => ride.rideId }
val fares = env
.addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
- .keyBy("rideId")
+ .keyBy { fare => fare.rideId }
val processed = rides
.connect(fares)
diff --git a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
index 0254540..20fb255 100644
--- a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
+++ b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
@@ -81,11 +81,11 @@ public class RidesAndFaresSolution extends ExerciseBase {
DataStream<TaxiRide> rides = env
.addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
.filter((TaxiRide ride) -> ride.isStart)
- .keyBy(ride -> ride.rideId);
+ .keyBy((TaxiRide ride) -> ride.rideId);
DataStream<TaxiFare> fares = env
.addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
- .keyBy(fare -> fare.rideId);
+ .keyBy((TaxiFare fare) -> fare.rideId);
// Set a UID on the stateful flatmap operator so we can read its state using the State Processor API.
DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
diff --git a/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala b/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala
index f6d9334..b7add66 100644
--- a/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala
+++ b/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala
@@ -56,11 +56,11 @@ object RidesAndFaresSolution {
val rides = env
.addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
.filter { ride => ride.isStart }
- .keyBy("rideId")
+ .keyBy { ride => ride.rideId }
val fares = env
.addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
- .keyBy("rideId")
+ .keyBy { fare => fare.rideId }
val processed = rides
.connect(fares)