You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/31 18:26:27 UTC
[1/2] flink git commit: [FLINK-4932] [distributed coordination]
Failing in state RESTARTING only fails terminally if no more restarts are
possible
Repository: flink
Updated Branches:
refs/heads/release-1.1 4b867019d -> d941b50db
[FLINK-4932] [distributed coordination] Failing in state RESTARTING only fails terminally if no more restarts are possible
If in state RESTARTING a failure occurs, then a new restart attempt is started. Only if the
restart strategy no longer allows further restarts or if the thrown exception is of type
SuppressRestartsException a job can go from RESTARTING into FAILED.
Fix failing test cases: ExecutionGraphMetricsTest and ExecutionGraphRestartTest
This closes #2711
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac82e3d0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac82e3d0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac82e3d0
Branch: refs/heads/release-1.1
Commit: ac82e3d05e895f74ecf41da489068a2997415d3d
Parents: 4b86701
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Oct 27 18:32:08 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Oct 31 15:27:21 2016 +0100
----------------------------------------------------------------------
docs/internals/fig/job_status.svg | 263 ++++++++++++-------
.../runtime/executiongraph/ExecutionGraph.java | 60 ++++-
.../ExecutionGraphMetricsTest.java | 4 +-
.../ExecutionGraphRestartTest.java | 7 +-
.../ExecutionGraphSignalsTest.java | 106 ++++++--
.../restart/InfiniteDelayRestartStrategy.java | 61 +++++
6 files changed, 378 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ac82e3d0/docs/internals/fig/job_status.svg
----------------------------------------------------------------------
diff --git a/docs/internals/fig/job_status.svg b/docs/internals/fig/job_status.svg
index c259db4..488f883 100644
--- a/docs/internals/fig/job_status.svg
+++ b/docs/internals/fig/job_status.svg
@@ -38,6 +38,50 @@ under the License.
<defs
id="defs4">
<marker
+ inkscape:isstock="true"
+ style="overflow:visible"
+ id="marker4737"
+ refX="0"
+ refY="0"
+ orient="auto"
+ inkscape:stockid="Arrow2Mend">
+ <path
+ inkscape:connector-curvature="0"
+ transform="scale(-0.6,-0.6)"
+ d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+ style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+ id="path4739" />
+ </marker>
+ <marker
+ inkscape:stockid="Arrow1Lend"
+ orient="auto"
+ refY="0.0"
+ refX="0.0"
+ id="marker4552"
+ style="overflow:visible;"
+ inkscape:isstock="true">
+ <path
+ id="path4298"
+ d="M 0.0,0.0 L 5.0,-5.0 L -12.5,0.0 L 5.0,5.0 L 0.0,0.0 z "
+ style="fill-rule:evenodd;stroke:#000000;stroke-width:1pt;stroke-opacity:1;fill:#000000;fill-opacity:1"
+ transform="scale(0.8) rotate(180) translate(12.5,0)" />
+ </marker>
+ <marker
+ inkscape:isstock="true"
+ style="overflow:visible"
+ id="marker4551"
+ refX="0"
+ refY="0"
+ orient="auto"
+ inkscape:stockid="Arrow2Mend">
+ <path
+ inkscape:connector-curvature="0"
+ transform="scale(-0.6,-0.6)"
+ d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+ style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+ id="path4553" />
+ </marker>
+ <marker
inkscape:stockid="Arrow2Mstart"
orient="auto"
refY="0.0"
@@ -343,7 +387,8 @@ under the License.
refX="0"
id="Arrow2Mend"
style="overflow:visible"
- inkscape:isstock="true">
+ inkscape:isstock="true"
+ inkscape:collect="always">
<path
id="path4486"
style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
@@ -404,17 +449,17 @@ under the License.
borderopacity="1.0"
inkscape:pageopacity="0.0"
inkscape:pageshadow="2"
- inkscape:zoom="1.4"
- inkscape:cx="366.44711"
- inkscape:cy="435.59833"
+ inkscape:zoom="0.98994949"
+ inkscape:cx="333.41527"
+ inkscape:cy="460.79478"
inkscape:document-units="px"
inkscape:current-layer="layer1"
showgrid="true"
- inkscape:window-width="1402"
- inkscape:window-height="855"
- inkscape:window-x="38"
- inkscape:window-y="1"
- inkscape:window-maximized="1">
+ inkscape:window-width="1916"
+ inkscape:window-height="1300"
+ inkscape:window-x="1855"
+ inkscape:window-y="21"
+ inkscape:window-maximized="0">
<inkscape:grid
type="xygrid"
id="grid4136" />
@@ -438,13 +483,13 @@ under the License.
transform="translate(0,-272.83465)">
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-start:url(#Arrow2Mstart);marker-end:url(#marker4407)"
- d="M 369.28571,490.93361 C 340,572.36218 330,712.36218 340.71429,802.36218"
+ d="M 409.28571,490.93361 C 380,572.36218 370,712.36218 380.71429,802.36218"
id="path3473"
inkscape:connector-curvature="0"
sodipodi:nodetypes="cc" />
<g
id="g4324"
- transform="translate(-30.285714,162.34191)">
+ transform="translate(9.714286,162.34191)">
<rect
ry="22.587013"
rx="21.337021"
@@ -468,7 +513,7 @@ under the License.
</g>
<g
id="g4286"
- transform="translate(-39.560883,231.66354)">
+ transform="translate(0.439117,231.66354)">
<rect
style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:1.5;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:none;stroke-dashoffset:0;stroke-opacity:1"
id="rect4254"
@@ -492,7 +537,7 @@ under the License.
</g>
<g
id="g4426"
- transform="translate(38,166)">
+ transform="translate(78,166)">
<rect
style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:4;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:24, 4;stroke-dashoffset:0;stroke-opacity:1"
id="rect4260"
@@ -516,7 +561,7 @@ under the License.
</g>
<g
id="g4276"
- transform="translate(-8.802002,175.91335)">
+ transform="translate(31.198,175.91335)">
<rect
style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:1.5;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:none;stroke-dashoffset:0;stroke-opacity:1"
id="rect4256"
@@ -540,7 +585,7 @@ under the License.
</g>
<g
id="g4421"
- transform="translate(40,166)">
+ transform="translate(80,166)">
<rect
ry="22.587013"
rx="21.337021"
@@ -564,7 +609,7 @@ under the License.
</g>
<g
id="g4416"
- transform="translate(14,166)">
+ transform="translate(54,166)">
<rect
ry="22.500114"
rx="26.670492"
@@ -588,7 +633,7 @@ under the License.
</g>
<g
id="g4431"
- transform="translate(38,166)">
+ transform="translate(78,166)">
<rect
ry="22.551325"
rx="23.453072"
@@ -612,7 +657,7 @@ under the License.
</g>
<g
id="g4411"
- transform="translate(14,166)">
+ transform="translate(54,166)">
<rect
style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:1.5;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:none;stroke-dashoffset:0;stroke-opacity:1"
id="rect4250"
@@ -636,7 +681,7 @@ under the License.
</g>
<g
id="g4436"
- transform="translate(11.142857,169.57143)">
+ transform="translate(51.14286,169.57143)">
<rect
style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:4;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:24, 4;stroke-dashoffset:0;stroke-opacity:1"
id="rect4264"
@@ -660,7 +705,7 @@ under the License.
</g>
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#Arrow2Mend)"
- d="m 175.31595,646.72195 122.27415,0.17603"
+ d="m 215.33702,647.73984 122.07008,0"
id="path4441"
inkscape:connector-type="polyline"
inkscape:connector-curvature="0"
@@ -669,7 +714,7 @@ under the License.
sodipodi:nodetypes="cc" />
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker4831)"
- d="m 423.67902,643.73984 146.73144,0"
+ d="m 464.00971,647.73984 146.07006,0"
id="path4443"
inkscape:connector-type="polyline"
inkscape:connector-curvature="0"
@@ -677,7 +722,7 @@ under the License.
inkscape:connection-end="#g4426" />
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5157)"
- d="M 143.45346,625.15282 C 240.11678,592.2528 289.05237,539.16028 337.31308,485.78223"
+ d="M 184.13444,625.15282 382.19596,485.78223"
id="path4445"
inkscape:connector-type="polyline"
inkscape:connector-curvature="0"
@@ -686,7 +731,7 @@ under the License.
sodipodi:nodetypes="cc" />
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5075)"
- d="m 437.2653,459.19522 132.84994,10e-6"
+ d="m 477.59599,463.19522 132.18856,10e-6"
id="path4447"
inkscape:connector-type="polyline"
inkscape:connector-curvature="0"
@@ -694,7 +739,7 @@ under the License.
inkscape:connection-end="#g4421" />
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker4947)"
- d="M 140.03639,670.32685 308.21925,805.99316"
+ d="M 179.41675,670.32685 343.87778,805.99316"
id="path4449"
inkscape:connector-type="polyline"
inkscape:connector-curvature="0"
@@ -702,7 +747,7 @@ under the License.
inkscape:connection-end="#g4416" />
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5005)"
- d="m 409.8612,824.4933 148.36356,3e-5"
+ d="m 450.27779,828.4933 147.58232,3e-5"
id="path4451"
inkscape:connector-type="polyline"
inkscape:connector-curvature="0"
@@ -710,7 +755,7 @@ under the License.
inkscape:connection-end="#g4431" />
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5237)"
- d="M 119.50359,668.89828 C 120,902.36221 317.44733,915.62541 504.35792,974.20919"
+ d="M 121.48606,668.30654 C 138.78064,811.61536 345.26224,888.30903 548.95582,965.9844"
id="path4453"
inkscape:connector-type="polyline"
inkscape:connector-curvature="0"
@@ -720,107 +765,107 @@ under the License.
<text
xml:space="preserve"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
- x="182.57143"
+ x="222.57141"
y="643.07654"
id="text4913"
sodipodi:linespacing="125%"><tspan
sodipodi:role="line"
id="tspan4915"
- x="182.57143"
+ x="222.57141"
y="643.07654"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Schedule job</tspan></text>
<text
xml:space="preserve"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
- x="432"
+ x="472"
y="638.36218"
id="text4929"
sodipodi:linespacing="125%"><tspan
sodipodi:role="line"
id="tspan4931"
- x="432"
+ x="472"
y="638.36218"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">All job vertices </tspan><tspan
sodipodi:role="line"
- x="432"
+ x="472"
y="657.11218"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
id="tspan4933">in final state</tspan></text>
<text
xml:space="preserve"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
- x="424"
+ x="464"
y="820.36218"
id="text5063"
sodipodi:linespacing="125%"><tspan
sodipodi:role="line"
id="tspan5065"
- x="424"
+ x="464"
y="820.36218"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">All job vertices </tspan><tspan
sodipodi:role="line"
- x="424"
+ x="464"
y="839.11218"
id="tspan5067"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">in final state</tspan></text>
<text
xml:space="preserve"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
- x="442"
+ x="482"
y="456.36221"
id="text5139"
sodipodi:linespacing="125%"><tspan
sodipodi:role="line"
id="tspan5141"
- x="442"
+ x="482"
y="456.36221"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">All job vertices </tspan><tspan
sodipodi:role="line"
- x="442"
+ x="482"
y="475.11221"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
id="tspan5143">in final state & </tspan><tspan
sodipodi:role="line"
- x="442"
+ x="482"
y="493.86221"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
id="tspan5145">not restartable</tspan></text>
<text
xml:space="preserve"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
- x="219"
+ x="259"
y="606.93359"
id="text5227"
sodipodi:linespacing="125%"><tspan
sodipodi:role="line"
id="tspan5229"
- x="219"
+ x="259"
y="606.93359"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Fail job</tspan></text>
<text
xml:space="preserve"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
- x="156.57143"
+ x="196.57141"
y="764.07648"
id="text5565"
sodipodi:linespacing="125%"><tspan
sodipodi:role="line"
id="tspan5567"
- x="156.57143"
+ x="196.57141"
y="764.07648"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Cancel job</tspan></text>
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5892)"
- d="m 314.18121,477.05236 c -47.69818,20.03987 -84.94599,9.32911 -116.30849,2.14285"
+ d="m 350.99338,463.19522 -106.2755,-1e-5"
id="path5569"
inkscape:connector-type="polyline"
inkscape:connector-curvature="0"
- inkscape:connection-start="#g4276"
+ sodipodi:nodetypes="cc"
inkscape:connection-end="#g4411"
- sodipodi:nodetypes="cc" />
+ inkscape:connection-start="#g4276" />
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6174)"
- d="M 419.49016,485.78223 C 974.28571,652.36221 835.65722,822.42397 665.47877,968.49491"
+ d="M 453.86766,483.76192 C 1064.7153,696.2824 753.48364,871.76678 689.84395,959.92348"
id="path5571"
inkscape:connector-type="polyline"
inkscape:connector-curvature="0"
@@ -829,7 +874,7 @@ under the License.
sodipodi:nodetypes="cc" />
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6280)"
- d="m 413.71359,666.04114 c 466.9675,42.03536 351.85357,186.4168 228.2627,292.45377"
+ d="M 446.3093,669.3167 C 966.50293,715.92449 749.288,844.81821 673.74388,957.90317"
id="path5573"
inkscape:connector-type="polyline"
inkscape:connector-curvature="0"
@@ -838,7 +883,7 @@ under the License.
sodipodi:nodetypes="cc" />
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6392)"
- d="M 375.26539,850.99339 556.58971,959.92348"
+ d="M 408.60702,850.99339 589.93134,959.92348"
id="path5575"
inkscape:connector-type="polyline"
inkscape:connector-curvature="0"
@@ -846,7 +891,7 @@ under the License.
inkscape:connection-end="#g4436" />
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6074)"
- d="M 173.42792,441.40962 C 790,62.362207 855,633.79078 686.95342,807.37059"
+ d="M 189.43309,439.22831 C 890,72.362179 790,652.36218 702.11815,804.93187"
id="path5579"
inkscape:connector-type="polyline"
inkscape:connector-curvature="0"
@@ -855,15 +900,16 @@ under the License.
sodipodi:nodetypes="cc" />
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6764)"
- d="M 123.688,485.69533 113.6599,625.15282"
+ d="m 148.78827,484.68518 -0.14393,138.44734"
id="path5581"
inkscape:connector-type="polyline"
inkscape:connector-curvature="0"
inkscape:connection-start="#g4411"
- inkscape:connection-end="#g4324" />
+ inkscape:connection-end="#g4324"
+ sodipodi:nodetypes="cc" />
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6510)"
- d="m 400.64822,624.43854 c 14.16495,-34.7353 13.47368,-78.92413 6.68911,-136.51345"
+ d="m 437.72662,622.12237 c 22.19839,-49.88375 12.35398,-92.37301 0.15901,-134.31983"
id="path5585"
inkscape:connector-type="polyline"
inkscape:connector-curvature="0"
@@ -872,7 +918,7 @@ under the License.
inkscape:connection-start="#g4286" />
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6634)"
- d="m 394.21832,671.04114 c 18.43152,58.30256 7.7951,100.67644 -12.18276,132.80916"
+ d="m 423.27918,670.32685 c 16.08015,35.64642 30.49507,77.65921 -0.96955,134.65616"
id="path5587"
inkscape:connector-type="polyline"
inkscape:connector-curvature="0"
@@ -881,125 +927,125 @@ under the License.
sodipodi:nodetypes="cc" />
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker7718)"
- d="M 194.71429,444.79077 C 295.26058,393.86555 426.46327,380.03465 584,438.3622"
+ d="M 234.71429,444.79077 C 335.26058,393.86555 466.46327,380.03465 624,438.3622"
id="path7710"
inkscape:connector-curvature="0"
sodipodi:nodetypes="cc" />
<text
xml:space="preserve"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
- x="10"
- y="556.36218"
+ x="74.24366"
+ y="557.37231"
id="text8166"
sodipodi:linespacing="125%"><tspan
sodipodi:role="line"
id="tspan8168"
- x="10"
- y="556.36218"
+ x="74.24366"
+ y="557.37231"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Restarted job</tspan></text>
<text
xml:space="preserve"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
- x="156"
- y="906.36218"
+ x="130.87746"
+ y="919.43469"
id="text8170"
sodipodi:linespacing="125%"><tspan
sodipodi:role="line"
id="tspan8172"
- x="156"
- y="906.36218"
+ x="130.87746"
+ y="919.43469"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Suspend job</tspan></text>
<text
sodipodi:linespacing="125%"
id="text8174"
y="906.93359"
- x="468.57144"
+ x="508.57141"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
xml:space="preserve"><tspan
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
y="906.93359"
- x="468.57144"
+ x="508.57141"
id="tspan8176"
sodipodi:role="line">Suspend job</tspan></text>
<text
xml:space="preserve"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
- x="746.42859"
- y="906.2193"
+ x="749.28571"
+ y="931.93359"
id="text8178"
sodipodi:linespacing="125%"><tspan
sodipodi:role="line"
id="tspan8180"
- x="746.42859"
- y="906.2193"
+ x="749.28571"
+ y="931.93359"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Suspend job</tspan></text>
<text
sodipodi:linespacing="125%"
id="text8182"
y="717.64789"
- x="482.14288"
+ x="522.14288"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
xml:space="preserve"><tspan
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
y="717.64789"
- x="482.14288"
+ x="522.14288"
id="tspan8184"
sodipodi:role="line">Suspend job</tspan></text>
<text
sodipodi:linespacing="125%"
id="text8186"
y="752.64789"
- x="409.42856"
+ x="449.42859"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
xml:space="preserve"><tspan
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
y="752.64789"
- x="409.42856"
+ x="449.42859"
id="tspan8188"
sodipodi:role="line">Cancel job</tspan></text>
<text
sodipodi:linespacing="125%"
id="text8190"
- y="390.50507"
- x="361.14285"
+ y="390.08667"
+ x="344.87888"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
xml:space="preserve"><tspan
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
- y="390.50507"
- x="361.14285"
+ y="390.08667"
+ x="344.87888"
id="tspan8192"
- sodipodi:role="line">Fail job</tspan></text>
+ sodipodi:role="line">Fail and job not restartable</tspan></text>
<text
sodipodi:linespacing="125%"
id="text8194"
y="306.21933"
- x="487.28571"
+ x="527.28571"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
xml:space="preserve"><tspan
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
y="306.21933"
- x="487.28571"
+ x="527.28571"
id="tspan8196"
sodipodi:role="line">Cancel job</tspan></text>
<text
xml:space="preserve"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
- x="174"
+ x="214"
y="510.36221"
id="text8198"
sodipodi:linespacing="125%"><tspan
sodipodi:role="line"
id="tspan8200"
- x="174"
+ x="214"
y="510.36221"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">All job vertices</tspan><tspan
sodipodi:role="line"
- x="174"
+ x="214"
y="529.11218"
id="tspan8202"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">in final state &</tspan><tspan
sodipodi:role="line"
- x="174"
+ x="214"
y="547.86218"
id="tspan8204"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">restartable</tspan></text>
@@ -1007,43 +1053,80 @@ under the License.
sodipodi:linespacing="125%"
id="text8206"
y="566.93372"
- x="418.28571"
+ x="458.28571"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
xml:space="preserve"><tspan
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
y="566.93372"
- x="418.28571"
+ x="458.28571"
id="tspan8208"
sodipodi:role="line">Fail job</tspan></text>
<path
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker4534)"
- d="M 9.5714286,648.36221 46,648.36221"
+ d="m 49.57143,648.36221 36.42857,0"
id="path3470"
inkscape:connector-curvature="0"
sodipodi:nodetypes="cc" />
<text
xml:space="preserve"
style="font-style:normal;font-weight:normal;font-size:25px;line-height:125%;font-family:Sans;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
- x="250.71428"
+ x="290.71429"
y="710.93359"
id="text7267"
sodipodi:linespacing="125%"><tspan
sodipodi:role="line"
id="tspan7269"
- x="250.71428"
+ x="290.71429"
y="710.93359"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;font-family:Sans-serif;-inkscape-font-specification:Sans-serif">Cancel job</tspan></text>
<text
sodipodi:linespacing="125%"
id="text7271"
y="565.505"
- x="293.28571"
+ x="333.28571"
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
xml:space="preserve"><tspan
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
y="565.505"
- x="293.28571"
+ x="333.28571"
id="tspan7273"
sodipodi:role="line">Fail job</tspan></text>
+ <path
+ inkscape:connector-curvature="0"
+ inkscape:connector-type="polyline"
+ id="path4525"
+ d="M 89.65484,475.69533 C -64.057054,715.31058 17.022511,901.60666 537.98082,988.00996"
+ style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker4551)"
+ sodipodi:nodetypes="cc" />
+ <text
+ xml:space="preserve"
+ style="font-style:normal;font-weight:normal;font-size:25px;line-height:125%;font-family:Sans;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+ x="80"
+ y="792.36218"
+ id="text4597"
+ sodipodi:linespacing="125%"><tspan
+ sodipodi:role="line"
+ id="tspan4599"
+ x="80"
+ y="792.36218"
+ style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;font-family:sans-serif;-inkscape-font-specification:sans-serif">Suspend job</tspan></text>
+ <path
+ style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;stroke-miterlimit:4;stroke-dasharray:none;marker-end:url(#marker4737)"
+ d="M 96.56344,444.48401 C -20,312.36218 270,332.36218 156.87312,441.05766"
+ id="path4289"
+ inkscape:connector-curvature="0"
+ sodipodi:nodetypes="cc" />
+ <text
+ xml:space="preserve"
+ style="font-style:normal;font-weight:normal;font-size:25px;line-height:125%;font-family:Sans;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+ x="41.817253"
+ y="342.46371"
+ id="text4905"
+ sodipodi:linespacing="125%"><tspan
+ sodipodi:role="line"
+ id="tspan4907"
+ x="41.817253"
+ y="342.46371"
+ style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;font-family:sans-serif;-inkscape-font-specification:sans-serif">Fail and job restartable</tspan></text>
</g>
</svg>
http://git-wip-us.apache.org/repos/asf/flink/blob/ac82e3d0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index cc8e75d..45124ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -869,14 +869,13 @@ public class ExecutionGraph implements Serializable {
current == JobStatus.SUSPENDED ||
current.isGloballyTerminalState()) {
return;
- } else if (current == JobStatus.RESTARTING && transitionState(current, JobStatus.FAILED, t)) {
- synchronized (progressLock) {
- postRunCleanup();
- progressLock.notifyAll();
+ } else if (current == JobStatus.RESTARTING) {
+ this.failureCause = t;
- LOG.info("Job {} failed during restart.", getJobID());
+ if (tryRestartOrFail()) {
return;
}
+ // concurrent job status change, let's check again
} else if (transitionState(current, JobStatus.FAILING, t)) {
this.failureCause = t;
@@ -963,6 +962,7 @@ public class ExecutionGraph implements Serializable {
scheduleForExecution(scheduler);
}
catch (Throwable t) {
+ LOG.warn("Failed to restart the job.", t);
fail(t);
}
}
@@ -1123,15 +1123,10 @@ public class ExecutionGraph implements Serializable {
}
}
else if (current == JobStatus.FAILING) {
- boolean allowRestart = !(failureCause instanceof SuppressRestartsException);
-
- if (allowRestart && restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {
- restartStrategy.restart(this);
- break;
- } else if ((!allowRestart || !restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, failureCause)) {
- postRunCleanup();
+ if (tryRestartOrFail()) {
break;
}
+ // concurrent job status change, let's check again
}
else if (current == JobStatus.SUSPENDED) {
// we've already cleaned up when entering the SUSPENDED state
@@ -1155,6 +1150,47 @@ public class ExecutionGraph implements Serializable {
}
}
+ /**
+ * Try to restart the job. If we cannot restart the job (e.g. no more restarts allowed), then
+ * try to fail the job. This operation is only permitted if the current state is FAILING or
+ * RESTARTING.
+ *
+ * @return true if the operation could be executed; false if a concurrent job status change occurred
+ */
+ private boolean tryRestartOrFail() {
+ JobStatus currentState = state;
+
+ if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
+ synchronized (progressLock) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Try to restart the job or fail it if no longer possible.", failureCause);
+ } else {
+ LOG.info("Try to restart the job or fail it if no longer possible.");
+ }
+
+ boolean isRestartable = !(failureCause instanceof SuppressRestartsException) && restartStrategy.canRestart();
+
+ if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
+ LOG.info("Restarting the job...");
+ restartStrategy.restart(this);
+
+ return true;
+ } else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
+ LOG.info("Could not restart the job.", failureCause);
+ postRunCleanup();
+
+ return true;
+ } else {
+ // we must have changed the state concurrently, thus we cannot complete this operation
+ return false;
+ }
+ }
+ } else {
+ // this operation is only allowed in the state FAILING or RESTARTING
+ return false;
+ }
+ }
+
private void postRunCleanup() {
try {
CheckpointCoordinator coord = this.checkpointCoordinator;
http://git-wip-us.apache.org/repos/asf/flink/blob/ac82e3d0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 2b8b867..788f8b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
@@ -257,7 +258,8 @@ public class ExecutionGraphMetricsTest extends TestLogger {
assertTrue(previousRestartingTime > 0);
// now lets fail the job while it is in restarting and see whether the restarting time then stops to increase
- executionGraph.fail(new Exception());
+ // for this to work, we have to use a SuppressRestartException
+ executionGraph.fail(new SuppressRestartsException(new Exception()));
assertEquals(JobStatus.FAILED, executionGraph.getState());
http://git-wip-us.apache.org/repos/asf/flink/blob/ac82e3d0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 0d09e38..3041ad3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -263,9 +263,14 @@ public class ExecutionGraphRestartTest extends TestLogger {
assertEquals(JobStatus.RESTARTING, executionGraph.getState());
- // Canceling needs to abort the restart
+ // The restarting should not fail with an ordinary exception
executionGraph.fail(new Exception("Test exception"));
+ assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+
+ // but it should fail when sending a SuppressRestartsException
+ executionGraph.fail(new SuppressRestartsException(new Exception("Test exception")));
+
assertEquals(JobStatus.FAILED, executionGraph.getState());
// The restart has been aborted
http://git-wip-us.apache.org/repos/asf/flink/blob/ac82e3d0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index 59f2a9b..72784fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -34,7 +36,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -43,6 +44,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.api.mockito.PowerMockito;
import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.same;
@@ -147,7 +150,7 @@ public class ExecutionGraphSignalsTest {
@Test
public void testCancel() throws Exception {
- Assert.assertEquals(JobStatus.CREATED, eg.getState());
+ assertEquals(JobStatus.CREATED, eg.getState());
eg.cancel();
verifyCancel(1);
@@ -156,42 +159,42 @@ public class ExecutionGraphSignalsTest {
eg.cancel();
verifyCancel(2);
- Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
+ assertEquals(JobStatus.CANCELLING, eg.getState());
eg.cancel();
verifyCancel(2);
- Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
+ assertEquals(JobStatus.CANCELLING, eg.getState());
f.set(eg, JobStatus.CANCELED);
eg.cancel();
verifyCancel(2);
- Assert.assertEquals(JobStatus.CANCELED, eg.getState());
+ assertEquals(JobStatus.CANCELED, eg.getState());
f.set(eg, JobStatus.FAILED);
eg.cancel();
verifyCancel(2);
- Assert.assertEquals(JobStatus.FAILED, eg.getState());
+ assertEquals(JobStatus.FAILED, eg.getState());
f.set(eg, JobStatus.FAILING);
eg.cancel();
verifyCancel(2);
- Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
+ assertEquals(JobStatus.CANCELLING, eg.getState());
f.set(eg, JobStatus.FINISHED);
eg.cancel();
verifyCancel(2);
- Assert.assertEquals(JobStatus.FINISHED, eg.getState());
+ assertEquals(JobStatus.FINISHED, eg.getState());
f.set(eg, JobStatus.RESTARTING);
eg.cancel();
verifyCancel(2);
- Assert.assertEquals(JobStatus.CANCELED, eg.getState());
+ assertEquals(JobStatus.CANCELED, eg.getState());
}
private void verifyCancel(int times) {
@@ -206,65 +209,65 @@ public class ExecutionGraphSignalsTest {
*/
@Test
public void testSuspend() throws Exception {
- Assert.assertEquals(JobStatus.CREATED, eg.getState());
+ assertEquals(JobStatus.CREATED, eg.getState());
Exception testException = new Exception("Test exception");
eg.suspend(testException);
verifyCancel(1);
- Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
f.set(eg, JobStatus.RUNNING);
eg.suspend(testException);
verifyCancel(2);
- Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
f.set(eg, JobStatus.FAILING);
eg.suspend(testException);
verifyCancel(3);
- Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
f.set(eg, JobStatus.CANCELLING);
eg.suspend(testException);
verifyCancel(4);
- Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
f.set(eg, JobStatus.FAILED);
eg.suspend(testException);
verifyCancel(4);
- Assert.assertEquals(JobStatus.FAILED, eg.getState());
+ assertEquals(JobStatus.FAILED, eg.getState());
f.set(eg, JobStatus.FINISHED);
eg.suspend(testException);
verifyCancel(4);
- Assert.assertEquals(JobStatus.FINISHED, eg.getState());
+ assertEquals(JobStatus.FINISHED, eg.getState());
f.set(eg, JobStatus.CANCELED);
eg.suspend(testException);
verifyCancel(4);
- Assert.assertEquals(JobStatus.CANCELED, eg.getState());
+ assertEquals(JobStatus.CANCELED, eg.getState());
f.set(eg, JobStatus.SUSPENDED);
eg.fail(testException);
- Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
eg.cancel();
- Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
}
// test that all source tasks receive STOP signal
@@ -290,6 +293,71 @@ public class ExecutionGraphSignalsTest {
}
}
+ /**
+ * Test that failing in state restarting will retrigger the restarting logic. This means that
+ * it only goes into the state FAILED after the restart strategy says the job is no longer
+ * restartable.
+ */
+ @Test
+ public void testFailureWhileRestarting() throws IllegalAccessException, NoSuchFieldException, InterruptedException {
+ Field restartStrategyField = eg.getClass().getDeclaredField("restartStrategy");
+ restartStrategyField.setAccessible(true);
+
+ restartStrategyField.set(eg, new InfiniteDelayRestartStrategy(1));
+
+ f.set(eg, JobStatus.RESTARTING);
+
+ eg.fail(new Exception("Test"));
+
+ // we should restart since we have one restart attempt left
+ assertEquals(JobStatus.RESTARTING, eg.getState());
+
+ eg.fail(new Exception("Test"));
+
+ // after depleting all our restart attempts we should go into Failed
+ assertEquals(JobStatus.FAILED, eg.getState());
+ }
+
+ /**
+ * Tests that a {@link SuppressRestartsException} in state RESTARTING stops the restarting
+ * immediately and sets the execution graph's state to FAILED.
+ */
+ @Test
+ public void testSuppressRestartFailureWhileRestarting() throws IllegalAccessException, NoSuchFieldException {
+ Field restartStrategyField = eg.getClass().getDeclaredField("restartStrategy");
+ restartStrategyField.setAccessible(true);
+
+ restartStrategyField.set(eg, new InfiniteDelayRestartStrategy());
+
+ f.set(eg, JobStatus.RESTARTING);
+
+ // suppress a possible restart
+ eg.fail(new SuppressRestartsException(new Exception("Test")));
+
+ assertEquals(JobStatus.FAILED, eg.getState());
+ }
+
+ /**
+ * Tests that we can suspend a job when in state RESTARTING.
+ */
+ @Test
+ public void testSuspendWhileRestarting() throws IllegalAccessException, NoSuchFieldException {
+ Field restartStrategyField = eg.getClass().getDeclaredField("restartStrategy");
+ restartStrategyField.setAccessible(true);
+
+ restartStrategyField.set(eg, new InfiniteDelayRestartStrategy());
+
+ f.set(eg, JobStatus.RESTARTING);
+
+ final Exception exception = new Exception("Suspended");
+
+ eg.suspend(exception);
+
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
+
+ assertEquals(exception, eg.getFailureCause());
+ }
+
// STOP only supported if all sources are stoppable
@Test(expected = StoppingException.class)
public void testStopBatching() throws StoppingException {
http://git-wip-us.apache.org/repos/asf/flink/blob/ac82e3d0/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
new file mode 100644
index 0000000..c1cbdd3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Testing restart strategy which promise to restart {@link ExecutionGraph} after the infinite time delay.
+ * Actually {@link ExecutionGraph} will never be restarted. No additional threads will be used.
+ */
+public class InfiniteDelayRestartStrategy implements RestartStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(InfiniteDelayRestartStrategy.class);
+
+ private final int maxRestartAttempts;
+ private int restartAttemptCounter;
+
+ public InfiniteDelayRestartStrategy() {
+ this(-1);
+ }
+
+ public InfiniteDelayRestartStrategy(int maxRestartAttempts) {
+ this.maxRestartAttempts = maxRestartAttempts;
+ restartAttemptCounter = 0;
+ }
+
+ @Override
+ public boolean canRestart() {
+ if (maxRestartAttempts >= 0) {
+ return restartAttemptCounter < maxRestartAttempts;
+ } else {
+ return true;
+ }
+ }
+
+ @Override
+ public void restart(ExecutionGraph executionGraph) {
+ LOG.info("Delaying retry of job execution forever");
+
+ if (maxRestartAttempts >= 0) {
+ restartAttemptCounter++;
+ }
+ }
+}
[2/2] flink git commit: [FLINK-4933] [exec graph] Don't let the EG
fail in case of a failing scheduleOrUpdateConsumers call
Posted by se...@apache.org.
[FLINK-4933] [exec graph] Don't let the EG fail in case of a failing scheduleOrUpdateConsumers call
Instead of failing the complete ExecutionGraph, a failing scheduleOrUpdateConsumers call
will be reported back to the caller. The caller can then decide what to do. Per default,
it will fail the calling task.
Adapt TaskManagerTest
This closes #2701
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d941b50d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d941b50d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d941b50d
Branch: refs/heads/release-1.1
Commit: d941b50db4c58c7ea3d5c1d888d34ea8975407e0
Parents: ac82e3d
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Oct 27 11:41:29 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Oct 31 16:22:11 2016 +0100
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionGraph.java | 16 +-
.../executiongraph/ExecutionGraphException.java | 39 +++++
.../flink/runtime/jobmanager/JobManager.scala | 17 ++-
.../flink/runtime/taskmanager/TaskManager.scala | 2 +-
.../ExecutionGraphSignalsTest.java | 27 ++++
.../runtime/taskmanager/TaskManagerTest.java | 148 ++++++++++++++++++-
6 files changed, 231 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d941b50d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 45124ff..ce8e0c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1275,17 +1275,23 @@ public class ExecutionGraph implements Serializable {
}
}
- public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {
+ /**
+ * Schedule or updates consumers of the given result partition.
+ *
+ * @param partitionId specifying the result partition whose consumer shall be scheduled or updated
+ * @throws ExecutionGraphException if the schedule or update consumers operation could not be executed
+ */
+ public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) throws ExecutionGraphException {
final Execution execution = currentExecutions.get(partitionId.getProducerId());
if (execution == null) {
- fail(new IllegalStateException("Cannot find execution for execution ID " +
- partitionId.getPartitionId()));
+ throw new ExecutionGraphException("Cannot find execution for execution Id " +
+ partitionId.getPartitionId() + '.');
}
else if (execution.getVertex() == null){
- fail(new IllegalStateException("Execution with execution ID " +
- partitionId.getPartitionId() + " has no vertex assigned."));
+ throw new ExecutionGraphException("Execution with execution Id " +
+ partitionId.getPartitionId() + " has no vertex assigned.");
} else {
execution.getVertex().scheduleOrUpdateConsumers(partitionId);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d941b50d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphException.java
new file mode 100644
index 0000000..2de249b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+/**
+ * Base class for exceptions occurring in the {@link ExecutionGraph}.
+ */
+public class ExecutionGraphException extends Exception {
+
+ private static final long serialVersionUID = -8253451032797220657L;
+
+ public ExecutionGraphException(String message) {
+ super(message);
+ }
+
+ public ExecutionGraphException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ExecutionGraphException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d941b50d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index f16747a..51cbedc 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.jobmanager
import java.io.{File, IOException}
import java.lang.management.ManagementFactory
-import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, UnknownHostException}
+import java.net._
import java.util.UUID
import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
import javax.management.ObjectName
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.execution.SuppressRestartsException
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
+import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionGraphException, ExecutionJobVertex}
import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
@@ -63,7 +63,7 @@ import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
import org.apache.flink.runtime.messages.RegistrationMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace}
import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
-import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, AccumulatorResultStringsFound, AccumulatorResultsErroneous, AccumulatorResultsFound, RequestAccumulatorResults, RequestAccumulatorResultsStringified}
+import org.apache.flink.runtime.messages.accumulators._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
@@ -831,8 +831,15 @@ class JobManager(
case ScheduleOrUpdateConsumers(jobId, partitionId) =>
currentJobs.get(jobId) match {
case Some((executionGraph, _)) =>
- sender ! decorateMessage(Acknowledge)
- executionGraph.scheduleOrUpdateConsumers(partitionId)
+ try {
+ executionGraph.scheduleOrUpdateConsumers(partitionId)
+ sender ! decorateMessage(Acknowledge)
+ } catch {
+ case e: ExecutionGraphException =>
+ sender ! decorateMessage(
+ Failure(new Exception("Could not schedule or update consumers.", e))
+ )
+ }
case None =>
log.error(s"Cannot find execution graph for job ID $jobId to schedule or update " +
s"consumers.")
http://git-wip-us.apache.org/repos/asf/flink/blob/d941b50d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 5ae52be..0368c4a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1138,7 +1138,7 @@ class TaskManager(
runningTasks.put(execId, prevTask)
throw new IllegalStateException("TaskManager already contains a task for id " + execId)
}
-
+
// all good, we kick off the task, which performs its own initialization
task.startTaskThread()
http://git-wip-us.apache.org/repos/asf/flink/blob/d941b50d/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index 72784fb..de4a026 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -46,6 +48,7 @@ import org.powermock.api.mockito.PowerMockito;
import scala.concurrent.duration.FiniteDuration;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.same;
@@ -364,5 +367,29 @@ public class ExecutionGraphSignalsTest {
eg.stop();
}
+ /**
+ * Tests that a failing scheduleOrUpdateConsumers call with a non-existing execution attempt
+ * id, will not fail the execution graph.
+ */
+ @Test
+ public void testFailingScheduleOrUpdateConsumers() throws IllegalAccessException {
+ IntermediateResultPartitionID intermediateResultPartitionId = new IntermediateResultPartitionID();
+ // The execution attempt id does not exist and thus the scheduleOrUpdateConsumers call
+ // should fail
+ ExecutionAttemptID producerId = new ExecutionAttemptID();
+ ResultPartitionID resultPartitionId = new ResultPartitionID(intermediateResultPartitionId, producerId);
+
+ f.set(eg, JobStatus.RUNNING);
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ try {
+ eg.scheduleOrUpdateConsumers(resultPartitionId);
+ fail("Expected ExecutionGraphException.");
+ } catch (ExecutionGraphException e) {
+ // we've expected this exception to occur
+ }
+
+ assertEquals(JobStatus.RUNNING, eg.getState());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d941b50d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 1c50265..72d6fb1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Kill;
import akka.actor.Props;
+import akka.actor.Status;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import org.apache.flink.api.common.ExecutionConfig;
@@ -42,6 +43,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -67,6 +69,7 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.StoppableInvokable;
+import org.apache.flink.types.IntValue;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
@@ -78,8 +81,10 @@ import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
+import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
@@ -123,7 +128,7 @@ public class TaskManagerTest extends TestLogger {
}
@Test
- public void testSubmitAndExecuteTask() {
+ public void testSubmitAndExecuteTask() throws IOException {
new JavaTestKit(system){{
ActorGateway taskManager = null;
@@ -221,10 +226,6 @@ public class TaskManagerTest extends TestLogger {
}
};
}
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
finally {
// shut down the actors
TestingUtils.stopActor(taskManager);
@@ -1392,6 +1393,77 @@ public class TaskManagerTest extends TestLogger {
}
}};
}
+
+ /**
+ * Test that a failing schedule or update consumers call leads to the failing of the respective
+ * task.
+ *
+ * IMPORTANT: We have to make sure that the invokable's cancel method is called, because only
+ * then the future is completed. We do this by not eagerly deploy consumer tasks and requiring
+ * the invokable to fill one memory segment. The completed memory segment will trigger the
+ * scheduling of the downstream operator since it is in pipeline mode. After we've filled the
+ * memory segment, we'll block the invokable and wait for the task failure due to the failed
+ * schedule or update consumers call.
+ */
+ @Test(timeout = 10000L)
+ public void testFailingScheduleOrUpdateConsumersMessage() throws Exception {
+ new JavaTestKit(system) {{
+ final Configuration configuration = new Configuration();
+
+ // set the memory segment to the smallest size possible, because we have to fill one
+ // memory buffer to trigger the schedule or update consumers message to the downstream
+ // operators
+ configuration.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096);
+
+ final JobID jid = new JobID();
+ final JobVertexID vid = new JobVertexID();
+ final ExecutionAttemptID eid = new ExecutionAttemptID();
+ final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig());
+
+ final ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor = new ResultPartitionDeploymentDescriptor(
+ new IntermediateDataSetID(),
+ new IntermediateResultPartitionID(),
+ ResultPartitionType.PIPELINED,
+ 1,
+ false // don't deploy eagerly but with the first completed memory buffer
+ );
+
+ final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,
+ "TestTask", 0, 1, 0, new Configuration(), new Configuration(),
+ TestInvokableRecordCancel.class.getName(),
+ Collections.singletonList(resultPartitionDeploymentDescriptor),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
+
+
+ ActorRef jmActorRef = system.actorOf(Props.create(FailingScheduleOrUpdateConsumersJobManager.class, leaderSessionID), "jobmanager");
+ ActorGateway jobManager = new AkkaActorGateway(jmActorRef, leaderSessionID);
+
+ final ActorGateway taskManager = TestingUtils.createTaskManager(
+ system,
+ jobManager,
+ configuration,
+ true,
+ true);
+
+ try {
+ TestInvokableRecordCancel.resetGotCanceledFuture();
+
+ Future<Object> result = taskManager.ask(new SubmitTask(tdd), timeout);
+
+ Await.result(result, timeout);
+
+ Future<Boolean> cancelFuture = TestInvokableRecordCancel.gotCanceled();
+
+ boolean cancelResult = Await.result(cancelFuture, timeout);
+
+ assertEquals(true, cancelResult);
+ } finally {
+ TestingUtils.stopActor(taskManager);
+ TestingUtils.stopActor(jobManager);
+ }
+ }};
+ }
// --------------------------------------------------------------------------------------------
@@ -1427,6 +1499,25 @@ public class TaskManagerTest extends TestLogger {
}
}
+ public static class FailingScheduleOrUpdateConsumersJobManager extends SimpleJobManager {
+
+ public FailingScheduleOrUpdateConsumersJobManager(UUID leaderSessionId) {
+ super(leaderSessionId);
+ }
+
+ @Override
+ public void handleMessage(Object message) throws Exception {
+ if (message instanceof ScheduleOrUpdateConsumers) {
+ getSender().tell(
+ decorateMessage(
+ new Status.Failure(new Exception("Could not schedule or update consumers."))),
+ getSelf());
+ } else {
+ super.handleMessage(message);
+ }
+ }
+ }
+
public static class SimpleLookupJobManager extends SimpleJobManager {
public SimpleLookupJobManager(UUID leaderSessionID) {
@@ -1452,7 +1543,7 @@ public class TaskManagerTest extends TestLogger {
public SimpleLookupFailingUpdateJobManager(UUID leaderSessionID, Set<ExecutionAttemptID> ids) {
super(leaderSessionID);
- this.validIDs = new HashSet<ExecutionAttemptID>(ids);
+ this.validIDs = new HashSet<>(ids);
}
@Override
@@ -1569,7 +1660,7 @@ public class TaskManagerTest extends TestLogger {
public void invoke() {}
}
- public static final class TestInvokableBlockingCancelable extends AbstractInvokable {
+ public static class TestInvokableBlockingCancelable extends AbstractInvokable {
@Override
public void invoke() throws Exception {
@@ -1583,4 +1674,47 @@ public class TaskManagerTest extends TestLogger {
}
}
}
+
+ public static final class TestInvokableRecordCancel extends AbstractInvokable {
+
+ private static final Object lock = new Object();
+ private static Promise<Boolean> gotCanceledFuture = new scala.concurrent.impl.Promise.DefaultPromise<>();
+
+ @Override
+ public void invoke() throws Exception {
+ final Object o = new Object();
+ RecordWriter<IntValue> recordWriter = new RecordWriter<>(getEnvironment().getWriter(0));
+
+ for (int i = 0; i < 1024; i++) {
+ recordWriter.emit(new IntValue(42));
+ }
+
+ synchronized (o) {
+ //noinspection InfiniteLoopStatement
+ while (true) {
+ o.wait();
+ }
+ }
+
+ }
+
+ @Override
+ public void cancel() {
+ synchronized (lock) {
+ gotCanceledFuture.success(true);
+ }
+ }
+
+ public static void resetGotCanceledFuture() {
+ synchronized (lock) {
+ gotCanceledFuture = new scala.concurrent.impl.Promise.DefaultPromise<>();
+ }
+ }
+
+ public static Future<Boolean> gotCanceled() {
+ synchronized (lock) {
+ return gotCanceledFuture.future();
+ }
+ }
+ }
}