You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/08/27 11:36:45 UTC

[flink] 03/03: [FLINK-18797][docs] Update deprecated forms of keyBy in docs

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3d0bf401b92adaff5c4b49b666c6482fed811b7a
Author: sanshi@wwdz.onaliyun.com <sa...@WWDZ1234>
AuthorDate: Thu Aug 27 16:28:28 2020 +0800

    [FLINK-18797][docs] Update deprecated forms of keyBy in docs
    
    This closes #13135
---
 docs/dev/connectors/cassandra.md                 |  6 +++---
 docs/dev/connectors/cassandra.zh.md              |  6 +++---
 docs/dev/datastream_api.md                       |  4 ++--
 docs/dev/datastream_api.zh.md                    |  4 ++--
 docs/dev/parallel.md                             |  6 +++---
 docs/dev/parallel.zh.md                          |  6 +++---
 docs/dev/stream/operators/index.md               | 12 ++++++------
 docs/dev/stream/operators/index.zh.md            | 12 ++++++------
 docs/dev/stream/operators/process_function.md    |  4 ++--
 docs/dev/stream/operators/process_function.zh.md |  4 ++--
 docs/dev/stream/state/queryable_state.md         |  2 +-
 docs/dev/stream/state/queryable_state.zh.md      |  2 +-
 docs/dev/stream/state/state.md                   |  2 +-
 docs/dev/stream/state/state.zh.md                |  2 +-
 docs/dev/types_serialization.md                  | 13 +++++--------
 docs/dev/types_serialization.zh.md               | 13 +++++--------
 docs/learn-flink/etl.md                          |  4 ++--
 docs/learn-flink/etl.zh.md                       |  4 ++--
 docs/ops/scala_shell.md                          |  2 +-
 docs/ops/scala_shell.zh.md                       |  2 +-
 20 files changed, 52 insertions(+), 58 deletions(-)

diff --git a/docs/dev/connectors/cassandra.md b/docs/dev/connectors/cassandra.md
index 6420799..436e044 100644
--- a/docs/dev/connectors/cassandra.md
+++ b/docs/dev/connectors/cassandra.md
@@ -160,7 +160,7 @@ DataStream<Tuple2<String, Long>> result = text
                 }
             }
         })
-        .keyBy(0)
+        .keyBy(value -> value.f0)
         .timeWindow(Time.seconds(5))
         .sum(1);
 
@@ -185,7 +185,7 @@ val result: DataStream[(String, Long)] = text
   .filter(_.nonEmpty)
   .map((_, 1L))
   // group by the tuple field "0" and sum up tuple field "1"
-  .keyBy(0)
+  .keyBy(_._1)
   .timeWindow(Time.seconds(5))
   .sum(1)
 
@@ -231,7 +231,7 @@ DataStream<WordCount> result = text
                 }
             }
         })
-        .keyBy("word")
+        .keyBy(WordCount::getWord)
         .timeWindow(Time.seconds(5))
 
         .reduce(new ReduceFunction<WordCount>() {
diff --git a/docs/dev/connectors/cassandra.zh.md b/docs/dev/connectors/cassandra.zh.md
index c10ea49..a7bd054 100644
--- a/docs/dev/connectors/cassandra.zh.md
+++ b/docs/dev/connectors/cassandra.zh.md
@@ -160,7 +160,7 @@ DataStream<Tuple2<String, Long>> result = text
                 }
             }
         })
-        .keyBy(0)
+        .keyBy(value -> value.f0)
         .timeWindow(Time.seconds(5))
         .sum(1);
 
@@ -185,7 +185,7 @@ val result: DataStream[(String, Long)] = text
   .filter(_.nonEmpty)
   .map((_, 1L))
   // group by the tuple field "0" and sum up tuple field "1"
-  .keyBy(0)
+  .keyBy(_._1)
   .timeWindow(Time.seconds(5))
   .sum(1)
 
@@ -231,7 +231,7 @@ DataStream<WordCount> result = text
                 }
             }
         })
-        .keyBy("word")
+        .keyBy(WordCount::getWord)
         .timeWindow(Time.seconds(5))
 
         .reduce(new ReduceFunction<WordCount>() {
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index 0d59d63..5aa36c7 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -274,7 +274,7 @@ public class WindowWordCount {
         DataStream<Tuple2<String, Integer>> dataStream = env
                 .socketTextStream("localhost", 9999)
                 .flatMap(new Splitter())
-                .keyBy(0)
+                .keyBy(value -> value.f0)
                 .timeWindow(Time.seconds(5))
                 .sum(1);
 
@@ -311,7 +311,7 @@ object WindowWordCount {
 
     val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
       .map { (_, 1) }
-      .keyBy(0)
+      .keyBy(_._1)
       .timeWindow(Time.seconds(5))
       .sum(1)
 
diff --git a/docs/dev/datastream_api.zh.md b/docs/dev/datastream_api.zh.md
index 8300827..6e24b3b 100644
--- a/docs/dev/datastream_api.zh.md
+++ b/docs/dev/datastream_api.zh.md
@@ -274,7 +274,7 @@ public class WindowWordCount {
         DataStream<Tuple2<String, Integer>> dataStream = env
                 .socketTextStream("localhost", 9999)
                 .flatMap(new Splitter())
-                .keyBy(0)
+                .keyBy(value -> value.f0)
                 .timeWindow(Time.seconds(5))
                 .sum(1);
 
@@ -311,7 +311,7 @@ object WindowWordCount {
 
     val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
       .map { (_, 1) }
-      .keyBy(0)
+      .keyBy(_._1)
       .timeWindow(Time.seconds(5))
       .sum(1)
 
diff --git a/docs/dev/parallel.md b/docs/dev/parallel.md
index 54fa5a1..3d66a59 100644
--- a/docs/dev/parallel.md
+++ b/docs/dev/parallel.md
@@ -54,7 +54,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn
 DataStream<String> text = [...]
 DataStream<Tuple2<String, Integer>> wordCounts = text
     .flatMap(new LineSplitter())
-    .keyBy(0)
+    .keyBy(value -> value.f0)
     .timeWindow(Time.seconds(5))
     .sum(1).setParallelism(5);
 
@@ -70,7 +70,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
 val text = [...]
 val wordCounts = text
     .flatMap{ _.split(" ") map { (_, 1) } }
-    .keyBy(0)
+    .keyBy(_._1)
     .timeWindow(Time.seconds(5))
     .sum(1).setParallelism(5)
 wordCounts.print()
@@ -113,7 +113,7 @@ env.setParallelism(3)
 val text = [...]
 val wordCounts = text
     .flatMap{ _.split(" ") map { (_, 1) } }
-    .keyBy(0)
+    .keyBy(_._1)
     .timeWindow(Time.seconds(5))
     .sum(1)
 wordCounts.print()
diff --git a/docs/dev/parallel.zh.md b/docs/dev/parallel.zh.md
index 041136d..97a82ab 100644
--- a/docs/dev/parallel.zh.md
+++ b/docs/dev/parallel.zh.md
@@ -45,7 +45,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn
 DataStream<String> text = [...]
 DataStream<Tuple2<String, Integer>> wordCounts = text
     .flatMap(new LineSplitter())
-    .keyBy(0)
+    .keyBy(value -> value.f0)
     .timeWindow(Time.seconds(5))
     .sum(1).setParallelism(5);
 
@@ -61,7 +61,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
 val text = [...]
 val wordCounts = text
     .flatMap{ _.split(" ") map { (_, 1) } }
-    .keyBy(0)
+    .keyBy(_._1)
     .timeWindow(Time.seconds(5))
     .sum(1).setParallelism(5)
 wordCounts.print()
@@ -98,7 +98,7 @@ env.setParallelism(3)
 val text = [...]
 val wordCounts = text
     .flatMap{ _.split(" ") map { (_, 1) } }
-    .keyBy(0)
+    .keyBy(_._1)
     .timeWindow(Time.seconds(5))
     .sum(1)
 wordCounts.print()
diff --git a/docs/dev/stream/operators/index.md b/docs/dev/stream/operators/index.md
index b8eb568..910ed2c 100644
--- a/docs/dev/stream/operators/index.md
+++ b/docs/dev/stream/operators/index.md
@@ -104,8 +104,8 @@ dataStream.filter(new FilterFunction<Integer>() {
             <p>
             This transformation returns a <em>KeyedStream</em>, which is, among other things, required to use <a href="{{ site.baseurl }}/dev/stream/state/state.html#keyed-state">keyed state</a>. </p>
 {% highlight java %}
-dataStream.keyBy("someKey") // Key by field "someKey"
-dataStream.keyBy(0) // Key by the first element of a Tuple
+dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey"
+dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple
 {% endhighlight %}
             <p>
             <span class="label label-danger">Attention</span>
@@ -187,7 +187,7 @@ keyedStream.maxBy("key");
             key according to some characteristic (e.g., the data that arrived within the last 5 seconds).
             See <a href="windows.html">windows</a> for a complete description of windows.
 {% highlight java %}
-dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
+dataStream.keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
 {% endhighlight %}
         </p>
           </td>
@@ -503,8 +503,8 @@ dataStream.filter { _ != 0 }
             Internally, this is implemented with hash partitioning. See <a href="{{ site.baseurl }}/dev/stream/state/state.html#keyed-state">keys</a> on how to specify keys.
             This transformation returns a KeyedStream.</p>
 {% highlight scala %}
-dataStream.keyBy("someKey") // Key by field "someKey"
-dataStream.keyBy(0) // Key by the first element of a Tuple
+dataStream.keyBy(_.someKey) // Key by field "someKey"
+dataStream.keyBy(_._1) // Key by the first element of a Tuple
 {% endhighlight %}
           </td>
         </tr>
@@ -566,7 +566,7 @@ keyedStream.maxBy("key")
             key according to some characteristic (e.g., the data that arrived within the last 5 seconds).
             See <a href="windows.html">windows</a> for a description of windows.
 {% highlight scala %}
-dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
+dataStream.keyBy(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
 {% endhighlight %}
         </p>
           </td>
diff --git a/docs/dev/stream/operators/index.zh.md b/docs/dev/stream/operators/index.zh.md
index 29d61b5..415eb5f 100644
--- a/docs/dev/stream/operators/index.zh.md
+++ b/docs/dev/stream/operators/index.zh.md
@@ -104,8 +104,8 @@ dataStream.filter(new FilterFunction<Integer>() {
             <p>
             This transformation returns a <em>KeyedStream</em>, which is, among other things, required to use <a href="{{ site.baseurl }}/dev/stream/state/state.html#keyed-state">keyed state</a>. </p>
 {% highlight java %}
-dataStream.keyBy("someKey") // Key by field "someKey"
-dataStream.keyBy(0) // Key by the first element of a Tuple
+dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey"
+dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple
 {% endhighlight %}
             <p>
             <span class="label label-danger">Attention</span>
@@ -187,7 +187,7 @@ keyedStream.maxBy("key");
             key according to some characteristic (e.g., the data that arrived within the last 5 seconds).
             See <a href="windows.html">windows</a> for a complete description of windows.
 {% highlight java %}
-dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
+dataStream.keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
 {% endhighlight %}
         </p>
           </td>
@@ -503,8 +503,8 @@ dataStream.filter { _ != 0 }
             Internally, this is implemented with hash partitioning. See <a href="{{ site.baseurl }}/dev/stream/state/state.html#keyed-state">keys</a> on how to specify keys.
             This transformation returns a KeyedStream.</p>
 {% highlight scala %}
-dataStream.keyBy("someKey") // Key by field "someKey"
-dataStream.keyBy(0) // Key by the first element of a Tuple
+dataStream.keyBy(_.someKey) // Key by field "someKey"
+dataStream.keyBy(_._1) // Key by the first element of a Tuple
 {% endhighlight %}
           </td>
         </tr>
@@ -566,7 +566,7 @@ keyedStream.maxBy("key")
             key according to some characteristic (e.g., the data that arrived within the last 5 seconds).
             See <a href="windows.html">windows</a> for a description of windows.
 {% highlight scala %}
-dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
+dataStream.keyBy(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
 {% endhighlight %}
         </p>
           </td>
diff --git a/docs/dev/stream/operators/process_function.md b/docs/dev/stream/operators/process_function.md
index ef51b84..6fd7661 100644
--- a/docs/dev/stream/operators/process_function.md
+++ b/docs/dev/stream/operators/process_function.md
@@ -107,7 +107,7 @@ DataStream<Tuple2<String, String>> stream = ...;
 
 // apply the process function onto a keyed stream
 DataStream<Tuple2<String, Long>> result = stream
-    .keyBy(0)
+    .keyBy(value -> value.f0)
     .process(new CountWithTimeoutFunction());
 
 /**
@@ -192,7 +192,7 @@ val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
 val result: DataStream[Tuple2[String, Long]] = stream
-  .keyBy(0)
+  .keyBy(_._1)
   .process(new CountWithTimeoutFunction())
 
 /**
diff --git a/docs/dev/stream/operators/process_function.zh.md b/docs/dev/stream/operators/process_function.zh.md
index 240da52..ca0658d 100644
--- a/docs/dev/stream/operators/process_function.zh.md
+++ b/docs/dev/stream/operators/process_function.zh.md
@@ -107,7 +107,7 @@ DataStream<Tuple2<String, String>> stream = ...;
 
 // apply the process function onto a keyed stream
 DataStream<Tuple2<String, Long>> result = stream
-    .keyBy(0)
+    .keyBy(value -> value.f0)
     .process(new CountWithTimeoutFunction());
 
 /**
@@ -192,7 +192,7 @@ val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
 val result: DataStream[Tuple2[String, Long]] = stream
-  .keyBy(0)
+  .keyBy(_._1)
   .process(new CountWithTimeoutFunction())
 
 /**
diff --git a/docs/dev/stream/state/queryable_state.md b/docs/dev/stream/state/queryable_state.md
index 03a2472..6e711b0 100644
--- a/docs/dev/stream/state/queryable_state.md
+++ b/docs/dev/stream/state/queryable_state.md
@@ -130,7 +130,7 @@ In a program like the following, all records of the keyed stream will be used to
 `ValueState.update(value)`:
 
 {% highlight java %}
-stream.keyBy(0).asQueryableState("query-name")
+stream.keyBy(value -> value.f0).asQueryableState("query-name")
 {% endhighlight %}
 
 This acts like the Scala API's `flatMapWithState`.
diff --git a/docs/dev/stream/state/queryable_state.zh.md b/docs/dev/stream/state/queryable_state.zh.md
index 4b76b4d..d3e1a8e 100644
--- a/docs/dev/stream/state/queryable_state.zh.md
+++ b/docs/dev/stream/state/queryable_state.zh.md
@@ -102,7 +102,7 @@ QueryableStateStream asQueryableState(
 返回的 `QueryableStateStream` 可以被视作一个sink,而且**不能再**被进一步转换。在内部实现上,一个 `QueryableStateStream` 被转换成一个 operator,使用输入的数据来更新 queryable state。state 如何更新是由 `asQueryableState` 提供的 `StateDescriptor` 来决定的。在下面的代码中, keyed stream 的所有数据将会通过 `ValueState.update(value)` 来更新状态:
 
 {% highlight java %}
-stream.keyBy(0).asQueryableState("query-name")
+stream.keyBy(value -> value.f0).asQueryableState("query-name")
 {% endhighlight %}
 
 这个行为类似于 Scala API 中的 `flatMapWithState`。
diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index 389a03e..2723ed9 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -195,7 +195,7 @@ public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>,
 
 // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
 env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
-        .keyBy(0)
+        .keyBy(value -> value.f0)
         .flatMap(new CountWindowAverage())
         .print();
 
diff --git a/docs/dev/stream/state/state.zh.md b/docs/dev/stream/state/state.zh.md
index 2cdbe5c..cf1a5b9 100644
--- a/docs/dev/stream/state/state.zh.md
+++ b/docs/dev/stream/state/state.zh.md
@@ -175,7 +175,7 @@ public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>,
 
 // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
 env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
-        .keyBy(0)
+        .keyBy(value -> value.f0)
         .flatMap(new CountWindowAverage())
         .print();
 
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index 0956372..dbae121 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -70,7 +70,7 @@ wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
     }
 });
 
-wordCounts.keyBy(0); // also valid .keyBy("f0")
+wordCounts.keyBy(value -> value.f0);
 
 
 {% endhighlight %}
@@ -86,11 +86,11 @@ val input = env.fromElements(
     WordCount("hello", 1),
     WordCount("world", 2)) // Case Class Data Set
 
-input.keyBy("word")// key by field expression "word"
+input.keyBy(_.word)
 
 val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
 
-input2.keyBy(0, 1) // key by field positions 0 and 1
+input2.keyBy(value => (value._1, value._2))
 {% endhighlight %}
 
 </div>
@@ -137,7 +137,7 @@ DataStream<WordWithCount> wordCounts = env.fromElements(
     new WordWithCount("hello", 1),
     new WordWithCount("world", 2));
 
-wordCounts.keyBy("word"); // key by field expression "word"
+wordCounts.keyBy(value -> value.word);
 
 {% endhighlight %}
 </div>
@@ -153,7 +153,7 @@ val input = env.fromElements(
     new WordWithCount("hello", 1),
     new WordWithCount("world", 2)) // Case Class Data Set
 
-input.keyBy("word")// key by field expression "word"
+input.keyBy(_.word)
 
 {% endhighlight %}
 </div>
@@ -238,9 +238,6 @@ Flink tries to infer a lot of information about the data types that are exchange
 Think about it like a database that infers the schema of tables. In most cases, Flink infers all necessary information seamlessly
 by itself. Having the type information allows Flink to do some cool things:
 
-* Using POJOs types and grouping / joining / aggregating them by referring to field names (like `dataSet.keyBy("username")`).
-  The type information allows Flink to check (for typos and type compatibility) early rather than failing later at runtime.
-
 * The more Flink knows about data types, the better the serialization and data layout schemes are.
   That is quite important for the memory usage paradigm in Flink (work on serialized data inside/outside the heap where ever possible
   and make serialization very cheap).
diff --git a/docs/dev/types_serialization.zh.md b/docs/dev/types_serialization.zh.md
index 0d27894..e6806b9 100644
--- a/docs/dev/types_serialization.zh.md
+++ b/docs/dev/types_serialization.zh.md
@@ -70,7 +70,7 @@ wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
     }
 });
 
-wordCounts.keyBy(0); // also valid .keyBy("f0")
+wordCounts.keyBy(value -> value.f0);
 
 
 {% endhighlight %}
@@ -86,11 +86,11 @@ val input = env.fromElements(
     WordCount("hello", 1),
     WordCount("world", 2)) // Case Class Data Set
 
-input.keyBy("word")// key by field expression "word"
+input.keyBy(_.word)
 
 val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
 
-input2.keyBy(0, 1) // key by field positions 0 and 1
+input2.keyBy(value => (value._1, value._2))
 {% endhighlight %}
 
 </div>
@@ -137,7 +137,7 @@ DataStream<WordWithCount> wordCounts = env.fromElements(
     new WordWithCount("hello", 1),
     new WordWithCount("world", 2));
 
-wordCounts.keyBy("word"); // key by field expression "word"
+wordCounts.keyBy(value -> value.word);
 
 {% endhighlight %}
 </div>
@@ -153,7 +153,7 @@ val input = env.fromElements(
     new WordWithCount("hello", 1),
     new WordWithCount("world", 2)) // Case Class Data Set
 
-input.keyBy("word")// key by field expression "word"
+input.keyBy(_.word)
 
 {% endhighlight %}
 </div>
@@ -237,9 +237,6 @@ Flink 会尽力推断有关数据类型的大量信息,这些数据会在分
 可以把它想象成一个推断表结构的数据库。在大多数情况下,Flink 可以依赖自身透明的推断出所有需要的类型信息。
 掌握这些类型信息可以帮助 Flink 实现很多意想不到的特性:
 
-* 对于使用 POJOs 类型的数据,可以通过指定字段名(比如 `dataSet.keyBy("username")` )进行 grouping 、joining、aggregating 操作。
-  类型信息可以帮助 Flink 在运行前做一些拼写错误以及类型兼容方面的检查,而不是等到运行时才暴露这些问题。
-
 * Flink 对数据类型了解的越多,序列化和数据布局方案就越好。
   这对 Flink 中的内存使用范式尤为重要(可以尽可能处理堆上或者堆外的序列化数据并且使序列化操作很廉价)。
 
diff --git a/docs/learn-flink/etl.md b/docs/learn-flink/etl.md
index 29cbdbf..7f16162 100644
--- a/docs/learn-flink/etl.md
+++ b/docs/learn-flink/etl.md
@@ -149,7 +149,7 @@ this would mean doing some sort of GROUP BY with the `startCell`, while in Flink
 {% highlight java %}
 rides
     .flatMap(new NYCEnrichment())
-    .keyBy("startCell")
+    .keyBy(value -> value.startCell)
 {% endhighlight %}
 
 Every `keyBy` causes a network shuffle that repartitions the stream. In general this is pretty
@@ -241,7 +241,7 @@ specify the key.
 
 {% highlight java %}
 minutesByStartCell
-  .keyBy(0) // startCell
+  .keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
   .maxBy(1) // duration
   .print();
 {% endhighlight %}
diff --git a/docs/learn-flink/etl.zh.md b/docs/learn-flink/etl.zh.md
index 881d0af..789101b 100644
--- a/docs/learn-flink/etl.zh.md
+++ b/docs/learn-flink/etl.zh.md
@@ -130,7 +130,7 @@ public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedR
 {% highlight java %}
 rides
     .flatMap(new NYCEnrichment())
-    .keyBy("startCell")
+    .keyBy(value -> value.startCell)
 {% endhighlight %}
 
 每个 `keyBy` 会通过 shuffle 来为数据流进行重新分区。总体来说这个开销是很大的,它涉及网络通信、序列化和反序列化。
@@ -206,7 +206,7 @@ DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
 
 {% highlight java %}
 minutesByStartCell
-  .keyBy(0) // startCell
+  .keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
   .maxBy(1) // duration
   .print();
 {% endhighlight %}
diff --git a/docs/ops/scala_shell.md b/docs/ops/scala_shell.md
index e8171ab..cb6c409 100644
--- a/docs/ops/scala_shell.md
+++ b/docs/ops/scala_shell.md
@@ -77,7 +77,7 @@ Scala-Flink> val textStreaming = senv.fromElements(
   "Or to take arms against a sea of troubles,")
 Scala-Flink> val countsStreaming = textStreaming
     .flatMap { _.toLowerCase.split("\\W+") }
-    .map { (_, 1) }.keyBy(0).sum(1)
+    .map { (_, 1) }.keyBy(_._1).sum(1)
 Scala-Flink> countsStreaming.print()
 Scala-Flink> senv.execute("Streaming Wordcount")
 {% endhighlight %}
diff --git a/docs/ops/scala_shell.zh.md b/docs/ops/scala_shell.zh.md
index e8171ab..cb6c409 100644
--- a/docs/ops/scala_shell.zh.md
+++ b/docs/ops/scala_shell.zh.md
@@ -77,7 +77,7 @@ Scala-Flink> val textStreaming = senv.fromElements(
   "Or to take arms against a sea of troubles,")
 Scala-Flink> val countsStreaming = textStreaming
     .flatMap { _.toLowerCase.split("\\W+") }
-    .map { (_, 1) }.keyBy(0).sum(1)
+    .map { (_, 1) }.keyBy(_._1).sum(1)
 Scala-Flink> countsStreaming.print()
 Scala-Flink> senv.execute("Streaming Wordcount")
 {% endhighlight %}