You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/15 17:57:27 UTC
[1/2] flink git commit: [FLINK-5805] [docs] Improvements to docs for
ProcessFunction
Repository: flink
Updated Branches:
refs/heads/master 7477c5b57 -> 5fb267de6
[FLINK-5805] [docs] Improvements to docs for ProcessFunction
This closes #3317
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5fb267de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5fb267de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5fb267de
Branch: refs/heads/master
Commit: 5fb267de68b68bc47c469f95b3bde8eebcd42007
Parents: 33ea78e
Author: David Anderson <da...@alpinegizmo.com>
Authored: Wed Feb 15 10:58:55 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 15 18:45:46 2017 +0100
----------------------------------------------------------------------
docs/dev/stream/process_function.md | 23 ++++++++++++++---------
1 file changed, 14 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5fb267de/docs/dev/stream/process_function.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/process_function.md b/docs/dev/stream/process_function.md
index 99a3bf6..22295be 100644
--- a/docs/dev/stream/process_function.md
+++ b/docs/dev/stream/process_function.md
@@ -47,7 +47,7 @@ stream.keyBy("id").process(new MyProcessFunction())
The timers allow applications to react to changes in processing time and in [event time](../event_time.html).
Every call to the function `processElement(...)` gets a `Context` object with gives access to the element's
-event time timestamp, and the *TimerService*. The `TimerService` can be used to register callbacks for future
+event time timestamp, and to the *TimerService*. The `TimerService` can be used to register callbacks for future
event-/processing- time instants. When a timer's particular time is reached, the `onTimer(...)` method is
called. During that call, all states are again scoped to the key with which the timer was created, allowing
timers to perform keyed state manipulation as well.
@@ -55,30 +55,35 @@ timers to perform keyed state manipulation as well.
## Low-level Joins
-To realize low-level operations on two inputs, applications can use the `CoProcessFunction`. It relates to the `ProcessFunction`
-in the same way as a `CoFlatMapFunction` relates to the `FlatMapFunction`: The function is typed to two different inputs and
+To realize low-level operations on two inputs, applications can use `CoProcessFunction`. It relates to `ProcessFunction`
+in the same way that `CoFlatMapFunction` relates to `FlatMapFunction`: the function is bound to two different inputs and
gets individual calls to `processElement1(...)` and `processElement2(...)` for records from the two different inputs.
-Implementing a low level join follows typically the pattern:
+Implementing a low level join typically follows this pattern:
- Create a state object for one input (or both)
- Update the state upon receiving elements from its input
- Upon receiving elements from the other input, probe the state and produce the joined result
+For example, you might be joining customer data to financial trades,
+while keeping state for the customer data. If you care about having
+complete and deterministic joins in the face of out-of-order events,
+you can use a timer to evaluate and emit the join for a trade when the
+watermark for the customer data stream has passed the time of that
+trade.
## Example
-The following example maintains counts per key, and emits the key/count pair if no update happened to the key for one minute
-(in event time):
+The following example maintains counts per key, and emits a key/count pair whenever a minute passes (in event time) without an update for that key:
- The count, key, and last-modification-timestamp are stored in a `ValueState`, which is implicitly scoped by key.
- For each record, the `ProcessFunction` increments the counter and sets the last-modification timestamp
- The function also schedules a callback one minute into the future (in event time)
- Upon each callback, it checks the callback's event time timestamp against the last-modification time of the stored count
- and emits the key/count if the match (no further update happened in that minute)
+ and emits the key/count if they match (i.e., no further update occurred during that minute)
-*Note:* This simple example could also have been implemented on top of session windows, we simple use it to illustrate
-the basic pattern of how to use the `ProcessFunction`.
+*Note:* This simple example could have been implemented with session windows. We use `ProcessFunction` here to illustrate
+the basic pattern it provides.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
[2/2] flink git commit: [FLINK-5793] [runtime] fix running slot may
not be add to AllocatedMap in SlotPool bug
Posted by se...@apache.org.
[FLINK-5793] [runtime] fix running slot may not be add to AllocatedMap in SlotPool bug
This closes #3306
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/33ea78ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/33ea78ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/33ea78ea
Branch: refs/heads/master
Commit: 33ea78ea37fab819a329b09aa213e61c16252067
Parents: 7477c5b
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Tue Feb 14 14:56:41 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 15 18:45:46 2017 +0100
----------------------------------------------------------------------
.../org/apache/flink/runtime/instance/SlotPool.java | 13 ++++++++++++-
.../apache/flink/runtime/instance/SlotPoolTest.java | 2 ++
2 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/33ea78ea/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 672431e..4da6c7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -436,7 +436,9 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
pendingRequest.allocationID(), taskManagerSlot.getSlotAllocationId());
- pendingRequest.future().complete(createSimpleSlot(taskManagerSlot, Locality.UNKNOWN));
+ SimpleSlot newSlot = createSimpleSlot(taskManagerSlot, Locality.UNKNOWN);
+ allocatedSlots.add(newSlot);
+ pendingRequest.future().complete(newSlot);
}
else {
LOG.debug("Adding returned slot [{}] to available slots", taskManagerSlot.getSlotAllocationId());
@@ -627,6 +629,15 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
}
// ------------------------------------------------------------------------
+ // Methods for tests
+ // ------------------------------------------------------------------------
+
+ @VisibleForTesting
+ AllocatedSlots getAllocatedSlots() {
+ return allocatedSlots;
+ }
+
+ // ------------------------------------------------------------------------
// Helper classes
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/33ea78ea/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 97457e1..538e286 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -115,6 +115,7 @@ public class SlotPoolTest extends TestLogger {
assertEquals(resourceID, slot.getTaskManagerID());
assertEquals(jobId, slot.getJobID());
assertEquals(slotPool.getSlotOwner(), slot.getOwner());
+ assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocatedSlot().getSlotAllocationId()), slot);
}
@Test
@@ -153,6 +154,7 @@ public class SlotPoolTest extends TestLogger {
assertTrue(slot2.isAlive());
assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+ assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocatedSlot().getSlotAllocationId()), slot2);
}
@Test