You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/20 15:57:46 UTC

[GitHub] zentol closed pull request #6503: [FLINK-10072] [docs] Syntax and consistency issues in "The Broadcast State Pattern"

zentol closed pull request #6503: [FLINK-10072] [docs] Syntax and consistency issues in "The Broadcast State Pattern"
URL: https://github.com/apache/flink/pull/6503
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/stream/state/broadcast_state.md b/docs/dev/stream/state/broadcast_state.md
index 593b6a29a87..f336a855a3a 100644
--- a/docs/dev/stream/state/broadcast_state.md
+++ b/docs/dev/stream/state/broadcast_state.md
@@ -76,8 +76,8 @@ BroadcastStream<Rule> ruleBroadcastStream = ruleStream
 {% endhighlight %}
 
 Finally, in order to evaluate the `Rules` against the incoming elements from the `Item` stream, we need to:
-    1) connect the two streams and 
-    2) specify our match detecting logic. 
+ 1. connect the two streams, and
+ 2. specify our match detecting logic.
 
 Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be done by calling `connect()` on the 
 non-broadcasted stream, with the `BroadcastStream` as an argument. This will return a `BroadcastConnectedStream`, on 
@@ -161,7 +161,7 @@ across all tasks. Ignoring this rule would break the consistency guarantees of t
 often difficult to debug results.
 
 <div class="alert alert-info">
-  <strong>Attention:</strong> The logic implemented in `processBroadcast()` must have the same determinstic behavior 
+  <strong>Attention:</strong> The logic implemented in `processBroadcast()` must have the same deterministic behavior
   across all parallel instances!
 </div>
 
@@ -172,8 +172,6 @@ exposes some functionality which is not available to the `BroadcastProcessFuncti
   `OnTimerContext` which exposes the same functionality as the `ReadOnlyContext` plus 
    - the ability to ask if the timer that fired was an event or processing time one and 
    - to query the key associated with the timer.
-
-  This is aligned with the `onTimer()` method of the `KeyedProcessFunction`. 
  2. the `Context` in the `processBroadcastElement()` method contains the method 
  `applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function)`. This allows to 
   register a `KeyedStateFunction` to be **applied to all states of all keys** associated with the provided `stateDescriptor`. 
@@ -192,34 +190,34 @@ new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
     // store partial matches, i.e. first elements of the pair waiting for their second element
     // we keep a list as we may have many first elements waiting
     private final MapStateDescriptor<String, List<Item>> mapStateDesc =
-	    new MapStateDescriptor<>(
-	        "items",
-	        BasicTypeInfo.STRING_TYPE_INFO, 
-	        new ListTypeInfo<>(Item.class));
+        new MapStateDescriptor<>(
+            "items",
+            BasicTypeInfo.STRING_TYPE_INFO,
+            new ListTypeInfo<>(Item.class));
 
     // identical to our ruleStateDescriptor above
     private final MapStateDescriptor<String, Rule> ruleStateDescriptor = 
         new MapStateDescriptor<>(
-    	    "RulesBroadcastState",
-    		BasicTypeInfo.STRING_TYPE_INFO,
-    		TypeInformation.of(new TypeHint<Rule>() {}));
-
-	@Override
-	public void processBroadcastElement(Rule value, 
-	                                    Context ctx, 
-	                                    Collector<String> out) throws Exception {
-	    ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
-	}
-
-	@Override
-	public void processElement(Item value, 
-	                           ReadOnlyContext ctx, 
-	                           Collector<String> out) throws Exception {
+            "RulesBroadcastState",
+            BasicTypeInfo.STRING_TYPE_INFO,
+            TypeInformation.of(new TypeHint<Rule>() {}));
+
+    @Override
+    public void processBroadcastElement(Rule value,
+                                        Context ctx,
+                                        Collector<String> out) throws Exception {
+        ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
+    }
+
+    @Override
+    public void processElement(Item value,
+                               ReadOnlyContext ctx,
+                               Collector<String> out) throws Exception {
 
         final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
         final Shape shape = value.getShape();
     
-        for (Map.Entry<String, Rule> entry: 
+        for (Map.Entry<String, Rule> entry :
                 ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
             final String ruleName = entry.getKey();
             final Rule rule = entry.getValue();
@@ -236,7 +234,7 @@ new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
                 stored.clear();
             }
     
-            // there is  no else{} to cover if rule.first == rule.second
+            // there is no else{} to cover if rule.first == rule.second
             if (shape.equals(rule.first)) {
                 stored.add(value);
             }
@@ -247,7 +245,7 @@ new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
                 state.put(ruleName, stored);
             }
         }
-	}
+    }
 }
 {% endhighlight %}
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services