You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/31 18:17:12 UTC

[4/4] flink git commit: [FLINK-4932] [distributed coordination] Failing in state RESTARTING only fails terminally if no more restarts are possible

[FLINK-4932] [distributed coordination] Failing in state RESTARTING only fails terminally if no more restarts are possible

If in state RESTARTING a failure occurs, then a new restart attempt is started. Only if the
restart strategy no longer allows further restarts or if the thrown exception is of type
SuppressRestartsException a job can go from RESTARTING into FAILED.

This closes #2710


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/18507de3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/18507de3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/18507de3

Branch: refs/heads/master
Commit: 18507de3c068795c93c5c689388a22857f2f817c
Parents: 59a5551
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Oct 27 18:32:08 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Oct 31 19:16:41 2016 +0100

----------------------------------------------------------------------
 docs/fig/job_status.svg                         | 263 ++++++++++++-------
 .../runtime/executiongraph/ExecutionGraph.java  |  60 ++++-
 .../ExecutionGraphMetricsTest.java              |   4 +-
 .../ExecutionGraphRestartTest.java              |   7 +-
 .../ExecutionGraphSignalsTest.java              | 106 ++++++--
 .../restart/InfiniteDelayRestartStrategy.java   |  22 +-
 6 files changed, 338 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/18507de3/docs/fig/job_status.svg
----------------------------------------------------------------------
diff --git a/docs/fig/job_status.svg b/docs/fig/job_status.svg
index c259db4..488f883 100644
--- a/docs/fig/job_status.svg
+++ b/docs/fig/job_status.svg
@@ -38,6 +38,50 @@ under the License.
   <defs
      id="defs4">
     <marker
+       inkscape:isstock="true"
+       style="overflow:visible"
+       id="marker4737"
+       refX="0"
+       refY="0"
+       orient="auto"
+       inkscape:stockid="Arrow2Mend">
+      <path
+         inkscape:connector-curvature="0"
+         transform="scale(-0.6,-0.6)"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         id="path4739" />
+    </marker>
+    <marker
+       inkscape:stockid="Arrow1Lend"
+       orient="auto"
+       refY="0.0"
+       refX="0.0"
+       id="marker4552"
+       style="overflow:visible;"
+       inkscape:isstock="true">
+      <path
+         id="path4298"
+         d="M 0.0,0.0 L 5.0,-5.0 L -12.5,0.0 L 5.0,5.0 L 0.0,0.0 z "
+         style="fill-rule:evenodd;stroke:#000000;stroke-width:1pt;stroke-opacity:1;fill:#000000;fill-opacity:1"
+         transform="scale(0.8) rotate(180) translate(12.5,0)" />
+    </marker>
+    <marker
+       inkscape:isstock="true"
+       style="overflow:visible"
+       id="marker4551"
+       refX="0"
+       refY="0"
+       orient="auto"
+       inkscape:stockid="Arrow2Mend">
+      <path
+         inkscape:connector-curvature="0"
+         transform="scale(-0.6,-0.6)"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c -1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         id="path4553" />
+    </marker>
+    <marker
        inkscape:stockid="Arrow2Mstart"
        orient="auto"
        refY="0.0"
@@ -343,7 +387,8 @@ under the License.
        refX="0"
        id="Arrow2Mend"
        style="overflow:visible"
-       inkscape:isstock="true">
+       inkscape:isstock="true"
+       inkscape:collect="always">
       <path
          id="path4486"
          style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
@@ -404,17 +449,17 @@ under the License.
      borderopacity="1.0"
      inkscape:pageopacity="0.0"
      inkscape:pageshadow="2"
-     inkscape:zoom="1.4"
-     inkscape:cx="366.44711"
-     inkscape:cy="435.59833"
+     inkscape:zoom="0.98994949"
+     inkscape:cx="333.41527"
+     inkscape:cy="460.79478"
      inkscape:document-units="px"
      inkscape:current-layer="layer1"
      showgrid="true"
-     inkscape:window-width="1402"
-     inkscape:window-height="855"
-     inkscape:window-x="38"
-     inkscape:window-y="1"
-     inkscape:window-maximized="1">
+     inkscape:window-width="1916"
+     inkscape:window-height="1300"
+     inkscape:window-x="1855"
+     inkscape:window-y="21"
+     inkscape:window-maximized="0">
     <inkscape:grid
        type="xygrid"
        id="grid4136" />
@@ -438,13 +483,13 @@ under the License.
      transform="translate(0,-272.83465)">
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-start:url(#Arrow2Mstart);marker-end:url(#marker4407)"
-       d="M 369.28571,490.93361 C 340,572.36218 330,712.36218 340.71429,802.36218"
+       d="M 409.28571,490.93361 C 380,572.36218 370,712.36218 380.71429,802.36218"
        id="path3473"
        inkscape:connector-curvature="0"
        sodipodi:nodetypes="cc" />
     <g
        id="g4324"
-       transform="translate(-30.285714,162.34191)">
+       transform="translate(9.714286,162.34191)">
       <rect
          ry="22.587013"
          rx="21.337021"
@@ -468,7 +513,7 @@ under the License.
     </g>
     <g
        id="g4286"
-       transform="translate(-39.560883,231.66354)">
+       transform="translate(0.439117,231.66354)">
       <rect
          style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:1.5;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:none;stroke-dashoffset:0;stroke-opacity:1"
          id="rect4254"
@@ -492,7 +537,7 @@ under the License.
     </g>
     <g
        id="g4426"
-       transform="translate(38,166)">
+       transform="translate(78,166)">
       <rect
          style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:4;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:24, 4;stroke-dashoffset:0;stroke-opacity:1"
          id="rect4260"
@@ -516,7 +561,7 @@ under the License.
     </g>
     <g
        id="g4276"
-       transform="translate(-8.802002,175.91335)">
+       transform="translate(31.198,175.91335)">
       <rect
          style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:1.5;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:none;stroke-dashoffset:0;stroke-opacity:1"
          id="rect4256"
@@ -540,7 +585,7 @@ under the License.
     </g>
     <g
        id="g4421"
-       transform="translate(40,166)">
+       transform="translate(80,166)">
       <rect
          ry="22.587013"
          rx="21.337021"
@@ -564,7 +609,7 @@ under the License.
     </g>
     <g
        id="g4416"
-       transform="translate(14,166)">
+       transform="translate(54,166)">
       <rect
          ry="22.500114"
          rx="26.670492"
@@ -588,7 +633,7 @@ under the License.
     </g>
     <g
        id="g4431"
-       transform="translate(38,166)">
+       transform="translate(78,166)">
       <rect
          ry="22.551325"
          rx="23.453072"
@@ -612,7 +657,7 @@ under the License.
     </g>
     <g
        id="g4411"
-       transform="translate(14,166)">
+       transform="translate(54,166)">
       <rect
          style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:1.5;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:none;stroke-dashoffset:0;stroke-opacity:1"
          id="rect4250"
@@ -636,7 +681,7 @@ under the License.
     </g>
     <g
        id="g4436"
-       transform="translate(11.142857,169.57143)">
+       transform="translate(51.14286,169.57143)">
       <rect
          style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:4;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:24, 4;stroke-dashoffset:0;stroke-opacity:1"
          id="rect4264"
@@ -660,7 +705,7 @@ under the License.
     </g>
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#Arrow2Mend)"
-       d="m 175.31595,646.72195 122.27415,0.17603"
+       d="m 215.33702,647.73984 122.07008,0"
        id="path4441"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -669,7 +714,7 @@ under the License.
        sodipodi:nodetypes="cc" />
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker4831)"
-       d="m 423.67902,643.73984 146.73144,0"
+       d="m 464.00971,647.73984 146.07006,0"
        id="path4443"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -677,7 +722,7 @@ under the License.
        inkscape:connection-end="#g4426" />
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5157)"
-       d="M 143.45346,625.15282 C 240.11678,592.2528 289.05237,539.16028 337.31308,485.78223"
+       d="M 184.13444,625.15282 382.19596,485.78223"
        id="path4445"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -686,7 +731,7 @@ under the License.
        sodipodi:nodetypes="cc" />
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5075)"
-       d="m 437.2653,459.19522 132.84994,10e-6"
+       d="m 477.59599,463.19522 132.18856,10e-6"
        id="path4447"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -694,7 +739,7 @@ under the License.
        inkscape:connection-end="#g4421" />
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker4947)"
-       d="M 140.03639,670.32685 308.21925,805.99316"
+       d="M 179.41675,670.32685 343.87778,805.99316"
        id="path4449"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -702,7 +747,7 @@ under the License.
        inkscape:connection-end="#g4416" />
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5005)"
-       d="m 409.8612,824.4933 148.36356,3e-5"
+       d="m 450.27779,828.4933 147.58232,3e-5"
        id="path4451"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -710,7 +755,7 @@ under the License.
        inkscape:connection-end="#g4431" />
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5237)"
-       d="M 119.50359,668.89828 C 120,902.36221 317.44733,915.62541 504.35792,974.20919"
+       d="M 121.48606,668.30654 C 138.78064,811.61536 345.26224,888.30903 548.95582,965.9844"
        id="path4453"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -720,107 +765,107 @@ under the License.
     <text
        xml:space="preserve"
        style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="182.57143"
+       x="222.57141"
        y="643.07654"
        id="text4913"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan4915"
-         x="182.57143"
+         x="222.57141"
          y="643.07654"
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Schedule job</tspan></text>
     <text
        xml:space="preserve"
        style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="432"
+       x="472"
        y="638.36218"
        id="text4929"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan4931"
-         x="432"
+         x="472"
          y="638.36218"
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">All job vertices </tspan><tspan
          sodipodi:role="line"
-         x="432"
+         x="472"
          y="657.11218"
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          id="tspan4933">in final state</tspan></text>
     <text
        xml:space="preserve"
        style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="424"
+       x="464"
        y="820.36218"
        id="text5063"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan5065"
-         x="424"
+         x="464"
          y="820.36218"
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">All job vertices </tspan><tspan
          sodipodi:role="line"
-         x="424"
+         x="464"
          y="839.11218"
          id="tspan5067"
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">in final state</tspan></text>
     <text
        xml:space="preserve"
        style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="442"
+       x="482"
        y="456.36221"
        id="text5139"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan5141"
-         x="442"
+         x="482"
          y="456.36221"
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">All job vertices </tspan><tspan
          sodipodi:role="line"
-         x="442"
+         x="482"
          y="475.11221"
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          id="tspan5143">in final state &amp; </tspan><tspan
          sodipodi:role="line"
-         x="442"
+         x="482"
          y="493.86221"
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          id="tspan5145">not restartable</tspan></text>
     <text
        xml:space="preserve"
        style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="219"
+       x="259"
        y="606.93359"
        id="text5227"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan5229"
-         x="219"
+         x="259"
          y="606.93359"
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Fail job</tspan></text>
     <text
        xml:space="preserve"
        style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="156.57143"
+       x="196.57141"
        y="764.07648"
        id="text5565"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan5567"
-         x="156.57143"
+         x="196.57141"
          y="764.07648"
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Cancel job</tspan></text>
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5892)"
-       d="m 314.18121,477.05236 c -47.69818,20.03987 -84.94599,9.32911 -116.30849,2.14285"
+       d="m 350.99338,463.19522 -106.2755,-1e-5"
        id="path5569"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
-       inkscape:connection-start="#g4276"
+       sodipodi:nodetypes="cc"
        inkscape:connection-end="#g4411"
-       sodipodi:nodetypes="cc" />
+       inkscape:connection-start="#g4276" />
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6174)"
-       d="M 419.49016,485.78223 C 974.28571,652.36221 835.65722,822.42397 665.47877,968.49491"
+       d="M 453.86766,483.76192 C 1064.7153,696.2824 753.48364,871.76678 689.84395,959.92348"
        id="path5571"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -829,7 +874,7 @@ under the License.
        sodipodi:nodetypes="cc" />
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6280)"
-       d="m 413.71359,666.04114 c 466.9675,42.03536 351.85357,186.4168 228.2627,292.45377"
+       d="M 446.3093,669.3167 C 966.50293,715.92449 749.288,844.81821 673.74388,957.90317"
        id="path5573"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -838,7 +883,7 @@ under the License.
        sodipodi:nodetypes="cc" />
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6392)"
-       d="M 375.26539,850.99339 556.58971,959.92348"
+       d="M 408.60702,850.99339 589.93134,959.92348"
        id="path5575"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -846,7 +891,7 @@ under the License.
        inkscape:connection-end="#g4436" />
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6074)"
-       d="M 173.42792,441.40962 C 790,62.362207 855,633.79078 686.95342,807.37059"
+       d="M 189.43309,439.22831 C 890,72.362179 790,652.36218 702.11815,804.93187"
        id="path5579"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -855,15 +900,16 @@ under the License.
        sodipodi:nodetypes="cc" />
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6764)"
-       d="M 123.688,485.69533 113.6599,625.15282"
+       d="m 148.78827,484.68518 -0.14393,138.44734"
        id="path5581"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
        inkscape:connection-start="#g4411"
-       inkscape:connection-end="#g4324" />
+       inkscape:connection-end="#g4324"
+       sodipodi:nodetypes="cc" />
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6510)"
-       d="m 400.64822,624.43854 c 14.16495,-34.7353 13.47368,-78.92413 6.68911,-136.51345"
+       d="m 437.72662,622.12237 c 22.19839,-49.88375 12.35398,-92.37301 0.15901,-134.31983"
        id="path5585"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -872,7 +918,7 @@ under the License.
        inkscape:connection-start="#g4286" />
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6634)"
-       d="m 394.21832,671.04114 c 18.43152,58.30256 7.7951,100.67644 -12.18276,132.80916"
+       d="m 423.27918,670.32685 c 16.08015,35.64642 30.49507,77.65921 -0.96955,134.65616"
        id="path5587"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -881,125 +927,125 @@ under the License.
        sodipodi:nodetypes="cc" />
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker7718)"
-       d="M 194.71429,444.79077 C 295.26058,393.86555 426.46327,380.03465 584,438.3622"
+       d="M 234.71429,444.79077 C 335.26058,393.86555 466.46327,380.03465 624,438.3622"
        id="path7710"
        inkscape:connector-curvature="0"
        sodipodi:nodetypes="cc" />
     <text
        xml:space="preserve"
        style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="10"
-       y="556.36218"
+       x="74.24366"
+       y="557.37231"
        id="text8166"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan8168"
-         x="10"
-         y="556.36218"
+         x="74.24366"
+         y="557.37231"
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Restarted job</tspan></text>
     <text
        xml:space="preserve"
        style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="156"
-       y="906.36218"
+       x="130.87746"
+       y="919.43469"
        id="text8170"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan8172"
-         x="156"
-         y="906.36218"
+         x="130.87746"
+         y="919.43469"
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Suspend job</tspan></text>
     <text
        sodipodi:linespacing="125%"
        id="text8174"
        y="906.93359"
-       x="468.57144"
+       x="508.57141"
        style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
        xml:space="preserve"><tspan
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          y="906.93359"
-         x="468.57144"
+         x="508.57141"
          id="tspan8176"
          sodipodi:role="line">Suspend job</tspan></text>
     <text
        xml:space="preserve"
        style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="746.42859"
-       y="906.2193"
+       x="749.28571"
+       y="931.93359"
        id="text8178"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan8180"
-         x="746.42859"
-         y="906.2193"
+         x="749.28571"
+         y="931.93359"
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Suspend job</tspan></text>
     <text
        sodipodi:linespacing="125%"
        id="text8182"
        y="717.64789"
-       x="482.14288"
+       x="522.14288"
        style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
        xml:space="preserve"><tspan
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          y="717.64789"
-         x="482.14288"
+         x="522.14288"
          id="tspan8184"
          sodipodi:role="line">Suspend job</tspan></text>
     <text
        sodipodi:linespacing="125%"
        id="text8186"
        y="752.64789"
-       x="409.42856"
+       x="449.42859"
        style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
        xml:space="preserve"><tspan
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          y="752.64789"
-         x="409.42856"
+         x="449.42859"
          id="tspan8188"
          sodipodi:role="line">Cancel job</tspan></text>
     <text
        sodipodi:linespacing="125%"
        id="text8190"
-       y="390.50507"
-       x="361.14285"
+       y="390.08667"
+       x="344.87888"
        style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
        xml:space="preserve"><tspan
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
-         y="390.50507"
-         x="361.14285"
+         y="390.08667"
+         x="344.87888"
          id="tspan8192"
-         sodipodi:role="line">Fail job</tspan></text>
+         sodipodi:role="line">Fail and job not restartable</tspan></text>
     <text
        sodipodi:linespacing="125%"
        id="text8194"
        y="306.21933"
-       x="487.28571"
+       x="527.28571"
        style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
        xml:space="preserve"><tspan
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          y="306.21933"
-         x="487.28571"
+         x="527.28571"
          id="tspan8196"
          sodipodi:role="line">Cancel job</tspan></text>
     <text
        xml:space="preserve"
        style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="174"
+       x="214"
        y="510.36221"
        id="text8198"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan8200"
-         x="174"
+         x="214"
          y="510.36221"
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">All job vertices</tspan><tspan
          sodipodi:role="line"
-         x="174"
+         x="214"
          y="529.11218"
          id="tspan8202"
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">in final state &amp;</tspan><tspan
          sodipodi:role="line"
-         x="174"
+         x="214"
          y="547.86218"
          id="tspan8204"
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">restartable</tspan></text>
@@ -1007,43 +1053,80 @@ under the License.
        sodipodi:linespacing="125%"
        id="text8206"
        y="566.93372"
-       x="418.28571"
+       x="458.28571"
        style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
        xml:space="preserve"><tspan
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          y="566.93372"
-         x="418.28571"
+         x="458.28571"
          id="tspan8208"
          sodipodi:role="line">Fail job</tspan></text>
     <path
        style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker4534)"
-       d="M 9.5714286,648.36221 46,648.36221"
+       d="m 49.57143,648.36221 36.42857,0"
        id="path3470"
        inkscape:connector-curvature="0"
        sodipodi:nodetypes="cc" />
     <text
        xml:space="preserve"
        style="font-style:normal;font-weight:normal;font-size:25px;line-height:125%;font-family:Sans;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="250.71428"
+       x="290.71429"
        y="710.93359"
        id="text7267"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan7269"
-         x="250.71428"
+         x="290.71429"
          y="710.93359"
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;font-family:Sans-serif;-inkscape-font-specification:Sans-serif">Cancel job</tspan></text>
     <text
        sodipodi:linespacing="125%"
        id="text7271"
        y="565.505"
-       x="293.28571"
+       x="333.28571"
        style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
        xml:space="preserve"><tspan
          style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif, Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          y="565.505"
-         x="293.28571"
+         x="333.28571"
          id="tspan7273"
          sodipodi:role="line">Fail job</tspan></text>
+    <path
+       inkscape:connector-curvature="0"
+       inkscape:connector-type="polyline"
+       id="path4525"
+       d="M 89.65484,475.69533 C -64.057054,715.31058 17.022511,901.60666 537.98082,988.00996"
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker4551)"
+       sodipodi:nodetypes="cc" />
+    <text
+       xml:space="preserve"
+       style="font-style:normal;font-weight:normal;font-size:25px;line-height:125%;font-family:Sans;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       x="80"
+       y="792.36218"
+       id="text4597"
+       sodipodi:linespacing="125%"><tspan
+         sodipodi:role="line"
+         id="tspan4599"
+         x="80"
+         y="792.36218"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;font-family:sans-serif;-inkscape-font-specification:sans-serif">Suspend job</tspan></text>
+    <path
+       style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;stroke-miterlimit:4;stroke-dasharray:none;marker-end:url(#marker4737)"
+       d="M 96.56344,444.48401 C -20,312.36218 270,332.36218 156.87312,441.05766"
+       id="path4289"
+       inkscape:connector-curvature="0"
+       sodipodi:nodetypes="cc" />
+    <text
+       xml:space="preserve"
+       style="font-style:normal;font-weight:normal;font-size:25px;line-height:125%;font-family:Sans;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       x="41.817253"
+       y="342.46371"
+       id="text4905"
+       sodipodi:linespacing="125%"><tspan
+         sodipodi:role="line"
+         id="tspan4907"
+         x="41.817253"
+         y="342.46371"
+         style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;font-family:sans-serif;-inkscape-font-specification:sans-serif">Fail and job restartable</tspan></text>
   </g>
 </svg>

http://git-wip-us.apache.org/repos/asf/flink/blob/18507de3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 074a04d..36dba63 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -818,14 +818,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				current == JobStatus.SUSPENDED ||
 				current.isGloballyTerminalState()) {
 				return;
-			} else if (current == JobStatus.RESTARTING && transitionState(current, JobStatus.FAILED, t)) {
-				synchronized (progressLock) {
-					postRunCleanup();
-					progressLock.notifyAll();
+			} else if (current == JobStatus.RESTARTING) {
+				this.failureCause = t;
 
-					LOG.info("Job {} failed during restart.", getJobID());
+				if (tryRestartOrFail()) {
 					return;
 				}
+				// concurrent job status change, let's check again
 			} else if (transitionState(current, JobStatus.FAILING, t)) {
 				this.failureCause = t;
 
@@ -902,6 +901,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			scheduleForExecution(slotProvider);
 		}
 		catch (Throwable t) {
+			LOG.warn("Failed to restart the job.", t);
 			fail(t);
 		}
 	}
@@ -1007,15 +1007,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 						}
 					}
 					else if (current == JobStatus.FAILING) {
-						boolean allowRestart = !(failureCause instanceof SuppressRestartsException);
-
-						if (allowRestart && restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {
-							restartStrategy.restart(this);
-							break;
-						} else if ((!allowRestart || !restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, failureCause)) {
-							postRunCleanup();
+						if (tryRestartOrFail()) {
 							break;
 						}
+						// concurrent job status change, let's check again
 					}
 					else if (current == JobStatus.SUSPENDED) {
 						// we've already cleaned up when entering the SUSPENDED state
@@ -1039,6 +1034,47 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
+	/**
+	 * Try to restart the job. If we cannot restart the job (e.g. no more restarts allowed), then
+	 * try to fail the job. This operation is only permitted if the current state is FAILING or
+	 * RESTARTING.
+	 *
+	 * @return true if the operation could be executed; false if a concurrent job status change occurred
+	 */
+	private boolean tryRestartOrFail() {
+		JobStatus currentState = state;
+
+		if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
+			synchronized (progressLock) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Try to restart the job or fail it if no longer possible.", failureCause);
+				} else {
+					LOG.info("Try to restart the job or fail it if no longer possible.");
+				}
+
+				boolean isRestartable = !(failureCause instanceof SuppressRestartsException) && restartStrategy.canRestart();
+
+				if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
+					LOG.info("Restarting the job...");
+					restartStrategy.restart(this);
+
+					return true;
+				} else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
+					LOG.info("Could not restart the job.", failureCause);
+					postRunCleanup();
+
+					return true;
+				} else {
+					// we must have changed the state concurrently, thus we cannot complete this operation
+					return false;
+				}
+			}
+		} else {
+			// this operation is only allowed in the state FAILING or RESTARTING
+			return false;
+		}
+	}
+
 	private void postRunCleanup() {
 		try {
 			CheckpointCoordinator coord = this.checkpointCoordinator;

http://git-wip-us.apache.org/repos/asf/flink/blob/18507de3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 70c2bf9..5b59471 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
@@ -272,7 +273,8 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 			assertTrue(previousRestartingTime > 0);
 	
 			// now lets fail the job while it is in restarting and see whether the restarting time then stops to increase
-			executionGraph.fail(new Exception());
+			// for this to work, we have to use a SuppressRestartException
+			executionGraph.fail(new SuppressRestartsException(new Exception()));
 	
 			assertEquals(JobStatus.FAILED, executionGraph.getState());
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/18507de3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 3743adb..7b2e20d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -264,9 +264,14 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		assertEquals(JobStatus.RESTARTING, executionGraph.getState());
 
-		// Canceling needs to abort the restart
+		// The restarting should not fail with an ordinary exception
 		executionGraph.fail(new Exception("Test exception"));
 
+		assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+
+		// but it should fail when sending a SuppressRestartsException
+		executionGraph.fail(new SuppressRestartsException(new Exception("Test exception")));
+
 		assertEquals(JobStatus.FAILED, executionGraph.getState());
 
 		// The restart has been aborted

http://git-wip-us.apache.org/repos/asf/flink/blob/18507de3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index 59f2a9b..72784fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -34,7 +36,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -43,6 +44,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.api.mockito.PowerMockito;
 
 import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.same;
@@ -147,7 +150,7 @@ public class ExecutionGraphSignalsTest {
 
 	@Test
 	public void testCancel() throws Exception {
-		Assert.assertEquals(JobStatus.CREATED, eg.getState());
+		assertEquals(JobStatus.CREATED, eg.getState());
 		eg.cancel();
 
 		verifyCancel(1);
@@ -156,42 +159,42 @@ public class ExecutionGraphSignalsTest {
 		eg.cancel();
 
 		verifyCancel(2);
-		Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
+		assertEquals(JobStatus.CANCELLING, eg.getState());
 
 		eg.cancel();
 
 		verifyCancel(2);
-		Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
+		assertEquals(JobStatus.CANCELLING, eg.getState());
 
 		f.set(eg, JobStatus.CANCELED);
 		eg.cancel();
 
 		verifyCancel(2);
-		Assert.assertEquals(JobStatus.CANCELED, eg.getState());
+		assertEquals(JobStatus.CANCELED, eg.getState());
 
 		f.set(eg, JobStatus.FAILED);
 		eg.cancel();
 
 		verifyCancel(2);
-		Assert.assertEquals(JobStatus.FAILED, eg.getState());
+		assertEquals(JobStatus.FAILED, eg.getState());
 
 		f.set(eg, JobStatus.FAILING);
 		eg.cancel();
 
 		verifyCancel(2);
-		Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
+		assertEquals(JobStatus.CANCELLING, eg.getState());
 
 		f.set(eg, JobStatus.FINISHED);
 		eg.cancel();
 
 		verifyCancel(2);
-		Assert.assertEquals(JobStatus.FINISHED, eg.getState());
+		assertEquals(JobStatus.FINISHED, eg.getState());
 
 		f.set(eg, JobStatus.RESTARTING);
 		eg.cancel();
 
 		verifyCancel(2);
-		Assert.assertEquals(JobStatus.CANCELED, eg.getState());
+		assertEquals(JobStatus.CANCELED, eg.getState());
 	}
 
 	private void verifyCancel(int times) {
@@ -206,65 +209,65 @@ public class ExecutionGraphSignalsTest {
 	 */
 	@Test
 	public void testSuspend() throws Exception {
-		Assert.assertEquals(JobStatus.CREATED, eg.getState());
+		assertEquals(JobStatus.CREATED, eg.getState());
 		Exception testException = new Exception("Test exception");
 
 		eg.suspend(testException);
 
 		verifyCancel(1);
-		Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
 
 		f.set(eg, JobStatus.RUNNING);
 
 		eg.suspend(testException);
 
 		verifyCancel(2);
-		Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
 
 		f.set(eg, JobStatus.FAILING);
 
 		eg.suspend(testException);
 
 		verifyCancel(3);
-		Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
 
 		f.set(eg, JobStatus.CANCELLING);
 
 		eg.suspend(testException);
 
 		verifyCancel(4);
-		Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
 
 		f.set(eg, JobStatus.FAILED);
 
 		eg.suspend(testException);
 
 		verifyCancel(4);
-		Assert.assertEquals(JobStatus.FAILED, eg.getState());
+		assertEquals(JobStatus.FAILED, eg.getState());
 
 		f.set(eg, JobStatus.FINISHED);
 
 		eg.suspend(testException);
 
 		verifyCancel(4);
-		Assert.assertEquals(JobStatus.FINISHED, eg.getState());
+		assertEquals(JobStatus.FINISHED, eg.getState());
 
 		f.set(eg, JobStatus.CANCELED);
 
 		eg.suspend(testException);
 
 		verifyCancel(4);
-		Assert.assertEquals(JobStatus.CANCELED, eg.getState());
+		assertEquals(JobStatus.CANCELED, eg.getState());
 
 		f.set(eg, JobStatus.SUSPENDED);
 
 		eg.fail(testException);
 
-		Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
 
 		eg.cancel();
 
-		Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
 	}
 
 	// test that all source tasks receive STOP signal
@@ -290,6 +293,71 @@ public class ExecutionGraphSignalsTest {
 		}
 	}
 
+	/**
+	 * Test that failing in state restarting will retrigger the restarting logic. This means that
+	 * it only goes into the state FAILED after the restart strategy says the job is no longer
+	 * restartable.
+	 */
+	@Test
+	public void testFailureWhileRestarting() throws IllegalAccessException, NoSuchFieldException, InterruptedException {
+		Field restartStrategyField = eg.getClass().getDeclaredField("restartStrategy");
+		restartStrategyField.setAccessible(true);
+
+		restartStrategyField.set(eg, new InfiniteDelayRestartStrategy(1));
+
+		f.set(eg, JobStatus.RESTARTING);
+
+		eg.fail(new Exception("Test"));
+
+		// we should restart since we have one restart attempt left
+		assertEquals(JobStatus.RESTARTING, eg.getState());
+
+		eg.fail(new Exception("Test"));
+
+		// after depleting all our restart attempts we should go into Failed
+		assertEquals(JobStatus.FAILED, eg.getState());
+	}
+
+	/**
+	 * Tests that a {@link SuppressRestartsException} in state RESTARTING stops the restarting
+	 * immediately and sets the execution graph's state to FAILED.
+	 */
+	@Test
+	public void testSuppressRestartFailureWhileRestarting() throws IllegalAccessException, NoSuchFieldException {
+		Field restartStrategyField = eg.getClass().getDeclaredField("restartStrategy");
+		restartStrategyField.setAccessible(true);
+
+		restartStrategyField.set(eg, new InfiniteDelayRestartStrategy());
+
+		f.set(eg, JobStatus.RESTARTING);
+
+		// suppress a possible restart
+		eg.fail(new SuppressRestartsException(new Exception("Test")));
+
+		assertEquals(JobStatus.FAILED, eg.getState());
+	}
+
+	/**
+	 * Tests that we can suspend a job when in state RESTARTING.
+	 */
+	@Test
+	public void testSuspendWhileRestarting() throws IllegalAccessException, NoSuchFieldException {
+		Field restartStrategyField = eg.getClass().getDeclaredField("restartStrategy");
+		restartStrategyField.setAccessible(true);
+
+		restartStrategyField.set(eg, new InfiniteDelayRestartStrategy());
+
+		f.set(eg, JobStatus.RESTARTING);
+
+		final Exception exception = new Exception("Suspended");
+
+		eg.suspend(exception);
+
+		assertEquals(JobStatus.SUSPENDED, eg.getState());
+
+		assertEquals(exception, eg.getFailureCause());
+	}
+
 	// STOP only supported if all sources are stoppable 
 	@Test(expected = StoppingException.class)
 	public void testStopBatching() throws StoppingException {

http://git-wip-us.apache.org/repos/asf/flink/blob/18507de3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
index 4be0b96..c1cbdd3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
@@ -29,13 +29,33 @@ import org.slf4j.LoggerFactory;
 public class InfiniteDelayRestartStrategy implements RestartStrategy {
 	private static final Logger LOG = LoggerFactory.getLogger(InfiniteDelayRestartStrategy.class);
 
+	private final int maxRestartAttempts;
+	private int restartAttemptCounter;
+
+	public InfiniteDelayRestartStrategy() {
+		this(-1);
+	}
+
+	public InfiniteDelayRestartStrategy(int maxRestartAttempts) {
+		this.maxRestartAttempts = maxRestartAttempts;
+		restartAttemptCounter = 0;
+	}
+
 	@Override
 	public boolean canRestart() {
-		return true;
+		if (maxRestartAttempts >= 0) {
+			return restartAttemptCounter < maxRestartAttempts;
+		} else {
+			return true;
+		}
 	}
 
 	@Override
 	public void restart(ExecutionGraph executionGraph) {
 		LOG.info("Delaying retry of job execution forever");
+
+		if (maxRestartAttempts >= 0) {
+			restartAttemptCounter++;
+		}
 	}
 }