You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:59:24 UTC

[33/47] flink git commit: [FLINK-2354] [runtime] Replace old StateHandleProvider by StateStorageHelper in ZooKeeperStateHandleStore

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobmanager/config.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobmanager/config.html b/flink-runtime-web/web-dashboard/web/partials/jobmanager/config.html
new file mode 100644
index 0000000..da6b75b
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/jobmanager/config.html
@@ -0,0 +1,33 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<table class="table table-properties">
+  <thead>
+    <tr>
+      <th>Key</th>
+      <th>Value</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr ng-repeat="entry in jobmanager.config | orderBy: 'key'">
+      <td>{{entry.key}}</td>
+      <td>{{entry.value}}</td>
+    </tr>
+  </tbody>
+</table>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobmanager/index.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobmanager/index.html b/flink-runtime-web/web-dashboard/web/partials/jobmanager/index.html
new file mode 100644
index 0000000..02c2f47
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/jobmanager/index.html
@@ -0,0 +1,33 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<nav class="navbar navbar-default navbar-fixed-top navbar-main">
+  <div id="fold-button" ng-click="showSidebar()" class="btn btn-default navbar-btn pull-left"><i class="fa fa-navicon"></i></div>
+  <div class="navbar-title">Job Manager</div>
+</nav>
+<nav class="navbar navbar-default navbar-fixed-top navbar-main-additional">
+  <ul class="nav nav-tabs">
+    <li ui-sref-active="active"><a ui-sref=".config">Configuration</a></li>
+    <li ui-sref-active="active"><a ui-sref=".log">Logs</a></li>
+    <li ui-sref-active="active"><a ui-sref=".stdout">Stdout</a></li>
+  </ul>
+</nav>
+<div id="content-inner" class="has-navbar-main-additional">
+  <div ui-view="details"></div>
+</div>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobmanager/stdout.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobmanager/stdout.html b/flink-runtime-web/web-dashboard/web/partials/jobmanager/stdout.html
new file mode 100644
index 0000000..df6a817
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/jobmanager/stdout.html
@@ -0,0 +1,40 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+-->
+<table class="table table-properties">
+  <thead>
+    <tr>
+      <th colspan="2">
+        <div class="row">
+          <div class="col-xs-10">Job Manager Output</div>
+          <div class="col-xs-1 text-right"><a ng-click="reloadData()" class="show-pointer"><i class="fa fa-refresh"></i></a></div>
+          <div class="col-xs-1 text-left"><a href="jobmanager/stdout"><i class="fa fa-download"></i></a></div>
+        </div>
+      </th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td colspan="2">
+        <pre>{{jobmanager.stdout}}</pre>
+      </td>
+    </tr>
+  </tbody>
+</table>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/completed-jobs.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/completed-jobs.html b/flink-runtime-web/web-dashboard/web/partials/jobs/completed-jobs.html
new file mode 100644
index 0000000..b76278d
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/completed-jobs.html
@@ -0,0 +1,53 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<nav class="navbar navbar-default navbar-fixed-top navbar-main">
+  <div id="fold-button" ng-click="showSidebar()" class="btn btn-default navbar-btn pull-left"><i class="fa fa-navicon"></i></div>
+  <div class="navbar-title">Completed Jobs</div>
+</nav>
+<div id="content-inner">
+  <table class="table table-hover table-clickable">
+    <thead>
+      <tr>
+        <th>Start Time</th>
+        <th>End Time</th>
+        <th>Duration</th>
+        <th>Job Name</th>
+        <th>Job ID</th>
+        <th>Tasks</th>
+        <th>Status</th>
+      </tr>
+    </thead>
+    <tbody>
+      <tr ng-repeat="job in jobs" ui-sref="single-job.plan.overview({ jobid: job.jid })">
+        <td>{{job['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}</td>
+        <td>{{job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}</td>
+        <td>{{job.duration}} ms</td>
+        <td>{{job.name}}</td>
+        <td>{{job.jid}}</td>
+        <td class="label-group">
+          <bs-label status="{{status}}" ng-repeat="(status, value) in job.tasks">{{value}}</bs-label>
+        </td>
+        <td> 
+          <bs-label status="{{job.state}}">{{job.state}}</bs-label>
+        </td>
+      </tr>
+    </tbody>
+  </table>
+</div>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html
new file mode 100644
index 0000000..a7a5d9d
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html
@@ -0,0 +1,57 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<table ng-if="job['execution-config']" class="table table-properties">
+  <thead>
+    <tr>
+      <th colspan="2">Execution configuration</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>Execution mode</td>
+      <td>{{ job['execution-config']['execution-mode'] }}</td>
+    </tr>
+    <tr>
+      <td>Max. number of execution retries</td>
+      <td>{{ job['execution-config']['max-execution-retries'] === -1 ? 'deactivated' : job['execution-config']['max-execution-retries'] }}</td>
+    </tr>
+    <tr>
+      <td>Job parallelism</td>
+      <td>{{ job['execution-config']['job-parallelism'] === -1 ? 'auto' : job['execution-config']['job-parallelism'] }}</td>
+    </tr>
+    <tr>
+      <td>Object reuse mode</td>
+      <td>{{ job['execution-config']['object-reuse-mode'] }}</td>
+    </tr>
+  </tbody>
+</table>
+<table ng-if="job['execution-config']['user-config']" class="table table-properties">
+  <thead>
+    <tr>
+      <th colspan="2">User configuration</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr ng-repeat="property in job['execution-config']['user-config']">
+      <td>{{property.name}}</td>
+      <td table-property="table-property" value="property.value"></td>
+    </tr>
+  </tbody>
+</table>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.exceptions.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.exceptions.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.exceptions.html
new file mode 100644
index 0000000..a5f6676
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.exceptions.html
@@ -0,0 +1,38 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<div ng-if="exceptions['root-exception']" class="panel panel-default panel-multi">
+  <div class="panel-heading clearfix">
+    <div class="panel-title">Root exception</div>
+  </div>
+  <div class="panel-body">
+    <pre class="exception">{{ exceptions['root-exception'] }}</pre>
+  </div>
+</div>
+<div ng-repeat="exception in exceptions['all-exceptions']" class="panel panel-default panel-multi">
+  <div class="panel-heading clearfix">
+    <div class="panel-title">{{ exception.task }}</div>
+  </div>
+  <div class="panel-heading clearfix">
+    <div class="panel-info thin last"><span>{{ exception.location }}</span></div>
+  </div>
+  <div class="panel-body">
+    <pre class="exception">{{ exception.exception }}</pre>
+  </div>
+</div>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
new file mode 100644
index 0000000..9d3e171
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
@@ -0,0 +1,48 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<nav ng-if="job" class="navbar navbar-default navbar-fixed-top navbar-main">
+  <div id="fold-button" ng-click="showSidebar()" class="btn btn-default navbar-btn pull-left"><i class="fa fa-navicon"></i></div>
+  <div class="navbar-title">
+    <indicator-primary status="{{job.state}}"></indicator-primary>{{ job.name }}
+  </div>
+  <div class="navbar-info first last hidden-xs hidden-sm">{{ job.jid }}</div>
+  <div class="navbar-info first last">
+    <div class="label-group">
+      <bs-label status="{{status}}" ng-repeat="(status, value) in job['status-counts']">{{value}}</bs-label>
+    </div>
+  </div>
+  <div class="navbar-info first last hidden-xs hidden-sm">{{ job['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}<span ng-if="job['end-time'] &gt; -1">
+      - 
+      {{ job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></div>
+  <div ng-if="job.duration &gt; -1" class="navbar-info last first">{{job.duration}} ms</div>
+</nav>
+<nav ng-if="job" class="navbar navbar-default navbar-fixed-top navbar-main-additional">
+  <ul class="nav nav-tabs">
+    <li ui-sref-active="active"><a ui-sref=".plan.overview">Plan</a></li>
+    <li ui-sref-active="active"><a ui-sref=".statistics">Job Accumulators / Statistics</a></li>
+    <li ui-sref-active="active"><a ui-sref=".timeline">Timeline</a></li>
+    <li ui-sref-active="active"><a ui-sref=".exceptions">Exceptions</a></li>
+    <li ui-sref-active="active"><a ui-sref=".properties">Properties</a></li>
+    <li ui-sref-active="active"><a ui-sref=".config">Configuration</a></li>
+  </ul>
+</nav>
+<div id="content-inner" class="has-navbar-main-additional">
+  <div ui-view="details"></div>
+</div>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html
new file mode 100644
index 0000000..f2c4143
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html
@@ -0,0 +1,31 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<div class="canvas-wrapper">
+  <div job-plan="job-plan" plan="plan" jobid="{{jobid}}" set-node="changeNode(nodeid)" class="main-canvas"></div>
+</div>
+<div ng-if="plan" class="panel panel-default panel-multi">
+  <nav class="navbar navbar-default navbar-secondary-additional">
+    <ul class="nav nav-tabs">
+      <li ui-sref-active="active"><a ui-sref=".overview({nodeid: nodeid})">Overview</a></li>
+      <li ui-sref-active="active"><a ui-sref=".accumulators({nodeid: nodeid})">Accumulators</a></li>
+    </ul>
+  </nav>
+  <div ui-view="node-details" class="panel-body clean"></div>
+</div>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.accumulators.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.accumulators.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.accumulators.html
new file mode 100644
index 0000000..8de3921
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.accumulators.html
@@ -0,0 +1,40 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<table class="table table-body-hover table-clickable table-activable">
+  <thead>
+    <tr>
+      <th>Name</th>
+      <th>Status</th>
+    </tr>
+  </thead>
+  <tbody ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid }" ng-click="v.id == nodeid || changeNode(v.id)">
+    <tr ng-if="v.type == 'regular'">
+      <td>{{ v.name | humanizeText }}</td>
+      <td> 
+        <bs-label status="{{v.status}}">{{v.status}}</bs-label>
+      </td>
+    </tr>
+    <tr ng-if="nodeid &amp;&amp; v.id == nodeid">
+      <td colspan="10">
+        <div ng-include=" 'partials/jobs/job.plan.node.accumulators.html' "></div>
+      </td>
+    </tr>
+  </tbody>
+</table>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.overview.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.overview.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.overview.html
new file mode 100644
index 0000000..1706d3e
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.overview.html
@@ -0,0 +1,60 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<table class="table table-body-hover table-clickable table-activable">
+  <thead>
+    <tr>
+      <th>Start Time</th>
+      <th>End Time</th>
+      <th>Duration</th>
+      <th>Name</th>
+      <th>Bytes read</th>
+      <th>Records read</th>
+      <th>Bytes written</th>
+      <th>Records written</th>
+      <th>Tasks</th>
+      <th>Status</th>
+    </tr>
+  </thead>
+  <tbody ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid }" ng-click="changeNode(v.id)">
+    <tr ng-if="v.type == 'regular'">
+      <td><span ng-if="v['start-time'] &gt; -1">{{ v['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td>
+      <td><span ng-if="v['end-time'] &gt; -1">{{ v['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td>
+      <td><span ng-if="v.duration &gt; -1">{{ v.duration }} ms</span></td>
+      <td class="td-long">{{ v.name | humanizeText }}</td>
+      <td>{{ v.metrics['read-bytes'] }}</td>
+      <td>{{ v.metrics['read-records'] }}</td>
+      <td>{{ v.metrics['write-bytes'] }}</td>
+      <td>{{ v.metrics['write-records'] }}</td>
+      <td>
+        <div class="label-group">
+          <bs-label status="{{status}}" ng-repeat="(index, status) in stateList">{{v.tasks[status]}}</bs-label>
+        </div>
+      </td>
+      <td> 
+        <bs-label status="{{v.status}}">{{v.status}}</bs-label>
+      </td>
+    </tr>
+    <tr ng-if="nodeid &amp;&amp; v.id == nodeid">
+      <td colspan="10">
+        <div ng-include=" 'partials/jobs/job.plan.node.subtasks.html' "></div>
+      </td>
+    </tr>
+  </tbody>
+</table>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.accumulators.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.accumulators.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.accumulators.html
new file mode 100644
index 0000000..e7dcf2c
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.accumulators.html
@@ -0,0 +1,68 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<div ng-if="accumulators.length == 0">
+  <p><i>No accumulators</i></p>
+</div>
+<div ng-if="accumulators &amp;&amp; accumulators.length &gt; 0">
+  <table class="table table-hover table-clickable table-activable table-inner">
+    <thead>
+      <tr>
+        <th>Name</th>
+        <th>Type</th>
+        <th>Value</th>
+      </tr>
+    </thead>
+    <tbody>
+      <tr ng-repeat="accumulator in accumulators">
+        <td width="30%">{{ accumulator.name }}</td>
+        <td width="30%">{{ accumulator.type }}</td>
+        <td width="30%">{{ accumulator.value }}</td>
+      </tr>
+    </tbody>
+  </table>
+  <div ng-if="!nodeUnfolded"><a ng-click="toggleFold()" class="btn btn-default">
+      Show subtasks
+       <i class="fa fa-chevron-down"></i></a><a ng-click="deactivateNode(); $event.stopPropagation()" title="Fold" class="btn btn-default pull-right"><i class="fa fa-chevron-up"></i></a></div>
+  <div ng-if="nodeUnfolded &amp;&amp; subtaskAccumulators &amp;&amp; subtaskAccumulators.length &gt; 0"><a ng-click="toggleFold()" class="btn btn-default">
+      Hide subtasks
+       <i class="fa fa-chevron-up"></i></a>
+    <table class="table table-hover table-clickable table-activable table-inner">
+      <thead>
+        <tr>
+          <th>Name</th>
+          <th>Type</th>
+          <th>Value</th>
+        </tr>
+      </thead>
+      <tbody ng-if="subtask['user-accumulators'] &amp;&amp; subtask['user-accumulators'].length &gt; 0" ng-repeat="subtask in subtaskAccumulators">
+        <tr>
+          <td colwidth="3">
+            <div class="small-label">({{ subtask.subtask }}) {{ subtask.host }}, attempt: {{ subtask.attempt + 1 }}</div>
+          </td>
+        </tr>
+        <tr ng-repeat="accumulator in subtask['user-accumulators']">
+          <td width="30%">{{ accumulator.name }}</td>
+          <td width="30%">{{ accumulator.type }}</td>
+          <td width="30%">{{ accumulator.value }}</td>
+        </tr>
+      </tbody>
+    </table>
+  </div>
+</div>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html
new file mode 100644
index 0000000..40b16bc
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html
@@ -0,0 +1,52 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+-->
+<table ng-if="subtasks" class="table table-hover table-clickable table-activable table-inner">
+  <thead>
+    <tr>
+      <th>Start Time</th>
+      <th>End Time</th>
+      <th>Duration</th>
+      <th>Bytes read</th>
+      <th>Records read</th>
+      <th>Bytes written</th>
+      <th>Records written</th>
+      <th>Attempt</th>
+      <th>Host</th>
+      <th>Status</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr ng-repeat="subtask in subtasks">
+      <td><span ng-if="subtask['start-time'] &gt; -1">{{ subtask['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td>
+      <td><span ng-if="subtask['end-time'] &gt; -1">{{ subtask['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td>
+      <td><span ng-if="subtask.duration &gt; -1">{{ subtask.duration }} ms</span></td>
+      <td><span ng-if="subtask.metrics['read-bytes'] &gt; -1">{{ subtask.metrics['read-bytes'] }}</span></td>
+      <td><span ng-if="subtask.metrics['read-records'] &gt; -1">{{ subtask.metrics['read-records'] }}</span></td>
+      <td><span ng-if="subtask.metrics['write-bytes'] &gt; -1">{{ subtask.metrics['write-bytes'] }}</span></td>
+      <td><span ng-if="subtask.metrics['write-records'] &gt; -1">{{ subtask.metrics['write-records'] }}</span></td>
+      <td>{{ subtask.attempt + 1 }}</td>
+      <td>{{ subtask.host }}</td>
+      <td> 
+        <bs-label status="{{subtask.status}}">{{subtask.status}}</bs-label>
+      </td>
+    </tr>
+  </tbody>
+</table>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.properties.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.properties.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.properties.html
new file mode 100644
index 0000000..907afd3
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.properties.html
@@ -0,0 +1,140 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<div class="canvas-wrapper">
+  <div job-plan="job-plan" plan="plan" jobid="{{jobid}}" set-node="changeNode(nodeid)" class="main-canvas"></div>
+</div>
+<div ng-if="node" class="panel panel-default">
+  <div class="panel-heading clearfix">
+    <div class="panel-title">{{ node.description | humanizeText }}</div>
+  </div>
+  <div class="panel-body clean">
+    <div class="row">
+      <div class="col-sm-6 col-md-4">
+        <table ng-if="node.optimizer_properties.global_properties" class="table table-properties">
+          <thead>
+            <tr>
+              <th colspan="2">Global Data Properties</th>
+            </tr>
+          </thead>
+          <tbody>
+            <tr ng-repeat="property in node.optimizer_properties.global_properties">
+              <td>{{property.name}}</td>
+              <td table-property="table-property" value="property.value"></td>
+            </tr>
+          </tbody>
+        </table>
+        <table ng-if="node.optimizer_properties.local_properties" class="table table-properties">
+          <thead>
+            <tr>
+              <th colspan="2">Local Data Properties</th>
+            </tr>
+          </thead>
+          <tbody>
+            <tr ng-repeat="property in node.optimizer_properties.local_properties">
+              <td>{{property.name}}</td>
+              <td table-property="table-property" value="property.value"></td>
+            </tr>
+          </tbody>
+        </table>
+        <div class="visible-xs visible-sm">
+          <table class="table table-properties">
+            <thead>
+              <tr>
+                <th colspan="2">Properties</th>
+              </tr>
+            </thead>
+            <tbody>
+              <tr>
+                <td>Operator</td>
+                <td table-property="table-property" value="node.operator_strategy"></td>
+              </tr>
+              <tr>
+                <td>Parallelism</td>
+                <td table-property="table-property" value="node.parallelism"></td>
+              </tr>
+            </tbody>
+          </table>
+        </div>
+      </div>
+      <div class="hidden-sm col-md-4">
+        <table class="table table-properties">
+          <thead>
+            <tr>
+              <th colspan="2">Properties</th>
+            </tr>
+          </thead>
+          <tbody>
+            <tr>
+              <td>Operator</td>
+              <td table-property="table-property" value="node.operator_strategy"></td>
+            </tr>
+            <tr>
+              <td>Parallelism</td>
+              <td table-property="table-property" value="node.parallelism"></td>
+            </tr>
+          </tbody>
+        </table>
+        <table ng-if="node.optimizer_properties.estimates" class="table table-properties">
+          <thead>
+            <tr>
+              <th colspan="2">Size Estimates</th>
+            </tr>
+          </thead>
+          <tbody>
+            <tr ng-repeat="property in node.optimizer_properties.estimates">
+              <td>{{property.name}}</td>
+              <td table-property="table-property" value="property.value"></td>
+            </tr>
+          </tbody>
+        </table>
+      </div>
+      <div class="col-sm-6 col-md-4">
+        <div class="visible-xs visible-sm">
+          <table ng-if="node.optimizer_properties.estimates" class="table table-properties">
+            <thead>
+              <tr>
+                <th colspan="2">Size Estimates</th>
+              </tr>
+            </thead>
+            <tbody>
+              <tr ng-repeat="property in node.optimizer_properties.estimates">
+                <td>{{property.name}}</td>
+                <td table-property="table-property" value="property.value"></td>
+              </tr>
+            </tbody>
+          </table>
+        </div>
+        <table ng-if="node.optimizer_properties.costs" class="table table-properties">
+          <thead>
+            <tr>
+              <th colspan="2">Cost Estimates</th>
+            </tr>
+          </thead>
+          <tbody>
+            <tr ng-repeat="property in node.optimizer_properties.costs">
+              <td>{{property.name}}</td>
+              <td table-property="table-property" value="property.value"></td>
+            </tr>
+          </tbody>
+        </table>
+      </div>
+    </div>
+  </div>
+</div>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.statistics.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.statistics.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.statistics.html
new file mode 100644
index 0000000..951cc1c
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.statistics.html
@@ -0,0 +1,40 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<table class="table table-properties">
+  <thead>
+    <tr>
+      <th colspan="2">Some statistics</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>Operator</td>
+      <td>1</td>
+    </tr>
+    <tr>
+      <td>Parallelism</td>
+      <td>2</td>
+    </tr>
+    <tr>
+      <td>Subtasks-per-instance</td>
+      <td>3</td>
+    </tr>
+  </tbody>
+</table>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.timeline.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.timeline.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.timeline.html
new file mode 100644
index 0000000..2f22576
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.timeline.html
@@ -0,0 +1,23 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<div class="canvas-wrapper">
+  <div timeline="timeline" vertices="vertices" jobid="jobid" class="timeline-canvas"></div>
+</div>
+<div ui-view="vertex"></div>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.timeline.vertex.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.timeline.vertex.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.timeline.vertex.html
new file mode 100644
index 0000000..1a4bd06
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.timeline.vertex.html
@@ -0,0 +1,30 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<div ng-if="vertex" class="panel panel-default panel-multi">
+  <div class="panel-heading clearfix">
+    <div class="panel-title">{{ vertex.groupvertex.groupvertexname | humanizeText }}</div>
+  </div>
+  <div class="panel-body">
+    <div class="canvas-wrapper">
+      <div vertex="vertex" data="vertex" class="timeline-canvas"></div>
+    </div>
+    <div id="timeline1"></div>
+  </div>
+</div>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/running-jobs.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/running-jobs.html b/flink-runtime-web/web-dashboard/web/partials/jobs/running-jobs.html
new file mode 100644
index 0000000..e175d07
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/running-jobs.html
@@ -0,0 +1,53 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<nav class="navbar navbar-default navbar-fixed-top navbar-main">
+  <div id="fold-button" ng-click="showSidebar()" class="btn btn-default navbar-btn pull-left"><i class="fa fa-navicon"></i></div>
+  <div class="navbar-title">Running Jobs</div>
+</nav>
+<div id="content-inner">
+  <table class="table table-hover table-clickable">
+    <thead>
+      <tr>
+        <th>Start Time</th>
+        <th>End Time</th>
+        <th>Duration</th>
+        <th>Job Name</th>
+        <th>Job ID</th>
+        <th>Tasks</th>
+        <th>Status</th>
+      </tr>
+    </thead>
+    <tbody>
+      <tr ng-repeat="job in jobs" ui-sref="single-job.plan.overview({ jobid: job.jid })">
+        <td>{{job['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}</td>
+        <td>{{job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}</td>
+        <td>{{job.duration}} ms</td>
+        <td>{{job.name}}</td>
+        <td>{{job.jid}}</td>
+        <td class="label-group">
+          <bs-label status="{{status}}" ng-repeat="(status, value) in job.tasks">{{value}}</bs-label>
+        </td>
+        <td> 
+          <bs-label status="{{job.state}}">{{job.state}}</bs-label>
+        </td>
+      </tr>
+    </tbody>
+  </table>
+</div>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/overview.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/overview.html b/flink-runtime-web/web-dashboard/web/partials/overview.html
new file mode 100644
index 0000000..ec3c580
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/overview.html
@@ -0,0 +1,147 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<nav class="navbar navbar-default navbar-fixed-top navbar-main">
+  <div id="fold-button" ng-click="showSidebar()" class="btn btn-default navbar-btn pull-left"><i class="fa fa-navicon"></i></div>
+  <div class="navbar-title">Overview</div>
+</nav>
+<div id="content-inner">
+  <div class="row">
+    <div class="col-md-6">
+      <div class="panel panel-default panel-dashboard">
+        <div class="panel-heading">
+          <div class="row">
+            <div class="col-xs-3"><i class="fa fa-tasks fa-3x"></i></div>
+            <div class="col-xs-9 text-right">
+              <div class="huge">{{overview.taskmanagers}}</div>
+              <div>Task Managers</div>
+            </div>
+          </div>
+        </div>
+        <div class="panel-heading">
+          <div class="row">
+            <div class="col-xs-3"><i class="fa fa-folder fa-3x"></i></div>
+            <div class="col-xs-9 text-right">
+              <div class="huge">{{overview["slots-total"]}}</div>
+              <div>Task Slots</div>
+            </div>
+          </div>
+        </div>
+        <div class="panel-heading">
+          <div class="row">
+            <div class="col-xs-3"><i class="fa fa-folder-o fa-3x"></i></div>
+            <div class="col-xs-9 text-right">
+              <div class="huge">{{overview["slots-available"]}}</div>
+              <div>Available Task Slots</div>
+            </div>
+          </div>
+        </div>
+      </div>
+    </div>
+    <div class="col-md-6">
+      <div class="panel panel-default panel-lg">
+        <div class="panel-heading">Total Jobs</div>
+        <div class="list-group">
+          <div class="list-group-item">
+            <div class="badge badge-primary">{{overview["jobs-running"]}}</div>Running
+          </div>
+          <div class="list-group-item">
+            <div class="badge badge-success">{{overview["jobs-finished"]}}</div>Finished
+          </div>
+          <div class="list-group-item">
+            <div class="badge badge-info">{{overview["jobs-cancelled"]}}</div>Canceled
+          </div>
+          <div class="list-group-item">
+            <div class="badge badge-danger">{{overview["jobs-failed"]}}</div>Failed
+          </div>
+        </div>
+      </div>
+    </div>
+  </div>
+  <div class="panel panel-default">
+    <div class="panel-heading">
+      <h3 class="panel-title">Running Jobs</h3>
+    </div>
+    <div class="panel-body">
+      <table class="table table-hover table-clickable">
+        <thead>
+          <tr>
+            <th>Start Time</th>
+            <th>End Time</th>
+            <th>Duration</th>
+            <th>Job Name</th>
+            <th>Job ID</th>
+            <th>Tasks</th>
+            <th>Status</th>
+          </tr>
+        </thead>
+        <tbody>
+          <tr ng-repeat="job in runningJobs" ui-sref="single-job.plan.overview({ jobid: job.jid })">
+            <td>{{job['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}</td>
+            <td>{{job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}</td>
+            <td>{{job.duration}} ms</td>
+            <td>{{job.name}}</td>
+            <td>{{job.jid}}</td>
+            <td class="label-group">
+              <bs-label status="{{status}}" ng-repeat="(status, value) in job.tasks">{{value}}</bs-label>
+            </td>
+            <td> 
+              <bs-label status="{{job.state}}">{{job.state}}</bs-label>
+            </td>
+          </tr>
+        </tbody>
+      </table>
+    </div>
+  </div>
+  <div class="panel panel-default">
+    <div class="panel-heading">
+      <h3 class="panel-title">Completed Jobs</h3>
+    </div>
+    <div class="panel-body">
+      <table class="table table-hover table-clickable">
+        <thead>
+          <tr>
+            <th>Start Time</th>
+            <th>End Time</th>
+            <th>Duration</th>
+            <th>Job Name</th>
+            <th>Job ID</th>
+            <th>Tasks</th>
+            <th>Status</th>
+          </tr>
+        </thead>
+        <tbody>
+          <tr ng-repeat="job in finishedJobs" ui-sref="single-job.plan.overview({ jobid: job.jid })">
+            <td>{{job['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}</td>
+            <td>{{job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}</td>
+            <td>{{job.duration}} ms</td>
+            <td>{{job.name}}</td>
+            <td>{{job.jid}}</td>
+            <td class="label-group">
+              <bs-label status="{{status}}" ng-repeat="(status, value) in job.tasks">{{value}}</bs-label>
+            </td>
+            <td> 
+              <bs-label status="{{job.state}}">{{job.state}}</bs-label>
+            </td>
+          </tr>
+        </tbody>
+      </table>
+    </div>
+  </div>
+</div>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html b/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html
new file mode 100644
index 0000000..bf37409
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html
@@ -0,0 +1,57 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<nav class="navbar navbar-default navbar-fixed-top navbar-main">
+  <div id="fold-button" ng-click="showSidebar()" class="btn btn-default navbar-btn pull-left"><i class="fa fa-navicon"></i></div>
+  <div class="navbar-title">Task Managers</div>
+</nav>
+<div id="content-inner">
+  <table class="table table-clickable table-hover">
+    <thead>
+      <tr>
+        <th>Path, ID</th>
+        <th>Data Port</th>
+        <th>Last Heartbeat</th>
+        <th>All Slots</th>
+        <th>Free Slots</th>
+        <th>CPU Cores</th>
+        <th>Physical Memory</th>
+        <th>Free Memory</th>
+        <th>Flink Managed Memory</th>
+      </tr>
+    </thead>
+    <tbody>
+      <tr ng-repeat="manager in managers" ui-sref="single-manager.metrics({taskmanagerid: manager.id})">
+        <td>
+          {{ manager.path }}
+           
+          <div class="small-label">{{ manager.id }}</div>
+        </td>
+        <td>{{ manager.dataPort }}</td>
+        <td>{{ manager.timeSinceLastHeartbeat | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</td>
+        <td>{{ manager.slotsNumber }}</td>
+        <td>{{ manager.freeSlots }}</td>
+        <td>{{ manager.cpuCores }}</td>
+        <td>{{ manager.physicalMemory | bytes:MB }}</td>
+        <td>{{ manager.freeMemory | bytes:MB }}</td>
+        <td>{{ manager.managedMemory | bytes:MB }}</td>
+      </tr>
+    </tbody>
+  </table>
+</div>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 8a037ad..4351eb1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,40 +50,27 @@ class FileSystemBlobStore implements BlobStore {
 	private final String basePath;
 
 	FileSystemBlobStore(Configuration config) throws IOException {
-		StateBackend stateBackend = StateBackend.fromConfig(config);
+		String stateBackendBasePath = config.getString(
+				ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
 
-		if (stateBackend == StateBackend.FILESYSTEM) {
-			String stateBackendBasePath = config.getString(
-					ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
-
-			if (stateBackendBasePath.equals("")) {
-				throw new IllegalConfigurationException(String.format("Missing configuration for " +
-						"file system state backend recovery path. Please specify via " +
-						"'%s' key.", ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
-			}
+		if (stateBackendBasePath.equals("")) {
+			throw new IllegalConfigurationException(String.format("Missing configuration for " +
+				"file system state backend recovery path. Please specify via " +
+				"'%s' key.", ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
+		}
 
-			stateBackendBasePath += "/blob";
+		stateBackendBasePath += "/blob";
 
-			this.basePath = stateBackendBasePath;
+		this.basePath = stateBackendBasePath;
 
-			try {
-				FileSystem.get(new URI(basePath)).mkdirs(new Path(basePath));
-			}
-			catch (URISyntaxException e) {
-				throw new IOException(e);
-			}
-
-			LOG.info("Created blob directory {}.", basePath);
+		try {
+			FileSystem.get(new URI(basePath)).mkdirs(new Path(basePath));
 		}
-		else {
-			// Nothing else support at the moment
-			throw new IllegalConfigurationException(
-					String.format("Illegal state backend " +
-									"configuration '%s'. Please configure '%s' as state " +
-									"backend and specify the recovery path via '%s' key.",
-							stateBackend, StateBackend.FILESYSTEM,
-							ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
+		catch (URISyntaxException e) {
+			throw new IOException(e);
 		}
+
+		LOG.info("Created blob directory {}.", basePath);
 	}
 
 	// - Put ------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 62ab440..cb2be64 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -26,7 +26,7 @@ import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,7 +92,8 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	 * @param client                         The Curator ZooKeeper client
 	 * @param checkpointsPath                The ZooKeeper path for the checkpoints (needs to
 	 *                                       start with a '/')
-	 * @param stateHandleProvider            The state handle provider for checkpoints
+	 * @param stateStorage                   State storage to be used to persist the completed
+	 *                                       checkpoint
 	 * @throws Exception
 	 */
 	public ZooKeeperCompletedCheckpointStore(
@@ -100,16 +101,16 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 			ClassLoader userClassLoader,
 			CuratorFramework client,
 			String checkpointsPath,
-			StateHandleProvider<CompletedCheckpoint> stateHandleProvider) throws Exception {
+			StateStorageHelper<CompletedCheckpoint> stateStorage) throws Exception {
 
 		checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
+		checkNotNull(stateStorage, "State storage");
 
 		this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
 		this.userClassLoader = checkNotNull(userClassLoader, "User class loader");
 
 		checkNotNull(client, "Curator client");
 		checkNotNull(checkpointsPath, "Checkpoints path");
-		checkNotNull(stateHandleProvider, "State handle provider");
 
 		// Ensure that the checkpoints path exists
 		client.newNamespaceAwareEnsurePath(checkpointsPath)
@@ -118,8 +119,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 		// All operations will have the path as root
 		this.client = client.usingNamespace(client.getNamespace() + checkpointsPath);
 
-		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(
-				this.client, stateHandleProvider);
+		this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage);
 
 		this.checkpointStateHandles = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 660f8bc..a9ac77a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -26,7 +26,7 @@ import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -87,13 +87,21 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	/** Flag indicating whether this instance is running. */
 	private boolean isRunning;
 
+	/**
+	 * Submitted job graph store backed by ZooKeeper
+	 *
+	 * @param client ZooKeeper client
+	 * @param currentJobsPath ZooKeeper path for current job graphs
+	 * @param stateStorage State storage used to persist the submitted jobs
+	 * @throws Exception
+	 */
 	public ZooKeeperSubmittedJobGraphStore(
 			CuratorFramework client,
 			String currentJobsPath,
-			StateHandleProvider<SubmittedJobGraph> stateHandleProvider) throws Exception {
+			StateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
 
 		checkNotNull(currentJobsPath, "Current jobs path");
-		checkNotNull(stateHandleProvider, "State handle provider");
+		checkNotNull(stateStorage, "State storage");
 
 		// Keep a reference to the original client and not the namespace facade. The namespace
 		// facade cannot be closed.
@@ -104,11 +112,11 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 				.ensure(client.getZookeeperClient());
 
 		// All operations will have the path as root
-		client = client.usingNamespace(client.getNamespace() + currentJobsPath);
+		CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
 
-		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(client, stateHandleProvider);
+		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage);
 
-		this.pathCache = new PathChildrenCache(client, "/", false);
+		this.pathCache = new PathChildrenCache(facade, "/", false);
 		pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
new file mode 100644
index 0000000..12250b9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Base class for key/value state implementations that are backed by a regular heap hash map. The
+ * concrete implementations define how the state is checkpointed.
+ * 
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ * @param <Backend> The type of the backend that snapshots this key/value state.
+ */
+public abstract class AbstractHeapKvState<K, V, Backend extends StateBackend<Backend>> implements KvState<K, V, Backend> {
+
+	/** Map containing the actual key/value pairs */
+	private final HashMap<K, V> state;
+	
+	/** The serializer for the keys */
+	private final TypeSerializer<K> keySerializer;
+
+	/** The serializer for the values */
+	private final TypeSerializer<V> valueSerializer;
+	
+	/** The value that is returned when no other value has been associated with a key, yet */
+	private final V defaultValue;
+	
+	/** The current key, which the next value methods will refer to */
+	private K currentKey;
+	
+	/**
+	 * Creates a new empty key/value state.
+	 * 
+	 * @param keySerializer The serializer for the keys.
+	 * @param valueSerializer The serializer for the values.
+	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
+	 */
+	protected AbstractHeapKvState(TypeSerializer<K> keySerializer,
+									TypeSerializer<V> valueSerializer,
+									V defaultValue) {
+		this(keySerializer, valueSerializer, defaultValue, new HashMap<K, V>());
+	}
+
+	/**
+	 * Creates a new key/value state for the given hash map of key/value pairs.
+	 * 
+	 * @param keySerializer The serializer for the keys.
+	 * @param valueSerializer The serializer for the values.
+	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
+	 * @param state The state map to use in this kev/value state. May contain initial state.   
+	 */
+	protected AbstractHeapKvState(TypeSerializer<K> keySerializer,
+									TypeSerializer<V> valueSerializer,
+									V defaultValue,
+									HashMap<K, V> state) {
+		this.state = requireNonNull(state);
+		this.keySerializer = requireNonNull(keySerializer);
+		this.valueSerializer = requireNonNull(valueSerializer);
+		this.defaultValue = defaultValue;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public V value() {
+		V value = state.get(currentKey);
+		return value != null ? value : defaultValue;
+	}
+
+	@Override
+	public void update(V value) {
+		if (value != null) {
+			state.put(currentKey, value);
+		}
+		else {
+			state.remove(currentKey);
+		}
+	}
+
+	@Override
+	public void setCurrentKey(K currentKey) {
+		this.currentKey = currentKey;
+	}
+
+	@Override
+	public int size() {
+		return state.size();
+	}
+
+	@Override
+	public void dispose() {
+		state.clear();
+	}
+
+	/**
+	 * Gets the serializer for the keys.
+	 * @return The serializer for the keys.
+	 */
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	/**
+	 * Gets the serializer for the values.
+	 * @return The serializer for the values.
+	 */
+	public TypeSerializer<V> getValueSerializer() {
+		return valueSerializer;
+	}
+
+	// ------------------------------------------------------------------------
+	//  checkpointing utilities
+	// ------------------------------------------------------------------------
+	
+	protected void writeStateToOutputView(final DataOutputView out) throws IOException {
+		for (Map.Entry<K, V> entry : state.entrySet()) {
+			keySerializer.serialize(entry.getKey(), out);
+			valueSerializer.serialize(entry.getValue(), out);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
new file mode 100644
index 0000000..5cc16a7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.OperatorState;
+
+/**
+ * Key/Value state implementation for user-defined state. The state is backed by a state
+ * backend, which typically follows one of the following patterns: Either the state is stored
+ * in the key/value state object directly (meaning in the executing JVM) and snapshotted by the
+ * state backend into some store (during checkpoints), or the key/value state is in fact backed
+ * by an external key/value store as the state backend, and checkpoints merely record the
+ * metadata of what is considered part of the checkpoint.
+ * 
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ */
+public interface KvState<K, V, Backend extends StateBackend<Backend>> extends OperatorState<V> {
+
+	/**
+	 * Sets the current key, which will be used to retrieve values for the next calls to
+	 * {@link #value()} and {@link #update(Object)}.
+	 * 
+	 * @param key The key.
+	 */
+	void setCurrentKey(K key);
+
+	/**
+	 * Creates a snapshot of this state.
+	 * 
+	 * @param checkpointId The ID of the checkpoint for which the snapshot should be created.
+	 * @param timestamp The timestamp of the checkpoint.
+	 * @return A snapshot handle for this key/value state.
+	 * 
+	 * @throws Exception Exceptions during snapshotting the state should be forwarded, so the system
+	 *                   can react to failed snapshots.
+	 */
+	KvStateSnapshot<K, V, Backend> shapshot(long checkpointId, long timestamp) throws Exception;
+
+	/**
+	 * Gets the number of key/value pairs currently stored in the state. Note that is a key
+	 * has been associated with "null", the key is removed from the state an will not
+	 * be counted here.
+	 *
+	 * @return The number of key/value pairs currently stored in the state.
+	 */
+	int size();
+
+	/**
+	 * Disposes the key/value state, releasing all occupied resources.
+	 */
+	void dispose();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
new file mode 100644
index 0000000..3d6c56c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * This class represents a snapshot of the {@link KvState}, taken for a checkpoint. Where exactly
+ * the snapshot stores the snapshot data (in this object, in an external data store, etc) depends
+ * on the actual implementation. This snapshot defines merely how to restore the state and
+ * how to discard the state.
+ *
+ * <p>One possible implementation is that this snapshot simply contains a copy of the key/value map.
+ * 
+ * <p>Another possible implementation for this snapshot is that the key/value map is serialized into
+ * a file and this snapshot object contains a pointer to that file.
+ *
+ * @param <K> The type of the key
+ * @param <V> The type of the value
+ * @param <Backend> The type of the backend that can restore the state from this snapshot.
+ */
+public interface KvStateSnapshot<K, V, Backend extends StateBackend<Backend>> extends java.io.Serializable {
+
+	/**
+	 * Loads the key/value state back from this snapshot.
+	 * 
+	 * 
+	 * @param stateBackend The state backend that created this snapshot and can restore the key/value state
+	 *                     from this snapshot.
+	 * @param keySerializer The serializer for the keys.
+	 * @param valueSerializer The serializer for the values.
+	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.   
+	 * @param classLoader The class loader for user-defined types.
+	 * 
+	 * @return An instance of the key/value state loaded from this snapshot.
+	 * 
+	 * @throws Exception Exceptions can occur during the state loading and are forwarded. 
+	 */
+	KvState<K, V, Backend> restoreState(
+			Backend stateBackend,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<V> valueSerializer,
+			V defaultValue,
+			ClassLoader classLoader) throws Exception;
+
+
+	/**
+	 * Discards the state snapshot, removing any resources occupied by it.
+	 * 
+	 * @throws Exception Exceptions occurring during the state disposal should be forwarded.
+	 */
+	void discardState() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java
new file mode 100644
index 0000000..16ad3fd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class represents serialized checkpoint data for a collection of elements.
+ */
+public class SerializedCheckpointData implements java.io.Serializable {
+
+	private static final long serialVersionUID = -8783744683896503488L;
+	
+	/** ID of the checkpoint for which the IDs are stored */
+	private final long checkpointId;
+
+	/** The serialized elements */
+	private final byte[] serializedData;
+
+	/** The number of elements in the checkpoint */
+	private final int numIds;
+
+	/**
+	 * Creates a SerializedCheckpointData object for the given serialized data.
+	 * 
+	 * @param checkpointId The checkpointId of the checkpoint.
+	 * @param serializedData The serialized IDs in this checkpoint.
+	 * @param numIds The number of IDs in the checkpoint.
+	 */
+	public SerializedCheckpointData(long checkpointId, byte[] serializedData, int numIds) {
+		this.checkpointId = checkpointId;
+		this.serializedData = serializedData;
+		this.numIds = numIds;
+	}
+
+	/**
+	 * Gets the checkpointId of the checkpoint.
+	 * @return The checkpointId of the checkpoint.
+	 */
+	public long getCheckpointId() {
+		return checkpointId;
+	}
+
+	/**
+	 * Gets the binary data for the serialized elements.
+	 * @return The binary data for the serialized elements.
+	 */
+	public byte[] getSerializedData() {
+		return serializedData;
+	}
+
+	/**
+	 * Gets the number of IDs in the checkpoint.
+	 * @return The number of IDs in the checkpoint.
+	 */
+	public int getNumIds() {
+		return numIds;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Serialize to Checkpoint
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Converts a list of checkpoints with elements into an array of SerializedCheckpointData.
+	 * 
+	 * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
+	 * @param serializer The serializer to serialize the IDs.
+	 * @param <T> The type of the ID.
+	 * @return An array of serializable SerializedCheckpointData, one per entry in the 
+	 * 
+	 * @throws IOException Thrown, if the serialization fails.
+	 */
+	public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, List<T>>> checkpoints,
+												TypeSerializer<T> serializer) throws IOException {
+		return fromDeque(checkpoints, serializer, new DataOutputSerializer(128));
+	}
+
+	/**
+	 * Converts a list of checkpoints into an array of SerializedCheckpointData.
+	 *
+	 * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
+	 * @param serializer The serializer to serialize the IDs.
+	 * @param outputBuffer The reusable serialization buffer.
+	 * @param <T> The type of the ID.
+	 * @return An array of serializable SerializedCheckpointData, one per entry in the 
+	 *
+	 * @throws IOException Thrown, if the serialization fails.
+	 */
+	public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, List<T>>> checkpoints,
+												TypeSerializer<T> serializer,
+												DataOutputSerializer outputBuffer) throws IOException {
+		SerializedCheckpointData[] serializedCheckpoints = new SerializedCheckpointData[checkpoints.size()];
+		
+		int pos = 0;
+		for (Tuple2<Long, List<T>> checkpoint : checkpoints) {
+			outputBuffer.clear();
+			List<T> checkpointIds = checkpoint.f1;
+			
+			for (T id : checkpointIds) {
+				serializer.serialize(id, outputBuffer);
+			}
+
+			serializedCheckpoints[pos++] = new SerializedCheckpointData(
+					checkpoint.f0, outputBuffer.getCopyOfBuffer(), checkpointIds.size());
+		}
+		
+		return serializedCheckpoints;
+	}
+
+	// ------------------------------------------------------------------------
+	//  De-Serialize from Checkpoint
+	// ------------------------------------------------------------------------
+
+	/**
+	 * De-serializes an array of SerializedCheckpointData back into an ArrayDeque of element checkpoints.
+	 * 
+	 * @param data The data to be deserialized.
+	 * @param serializer The serializer used to deserialize the data.
+	 * @param <T> The type of the elements.
+	 * @return An ArrayDeque of element checkpoints.
+	 * 
+	 * @throws IOException Thrown, if the serialization fails.
+	 */
+	public static <T> ArrayDeque<Tuple2<Long, List<T>>> toDeque(
+			SerializedCheckpointData[] data, TypeSerializer<T> serializer) throws IOException
+	{
+		ArrayDeque<Tuple2<Long, List<T>>> deque = new ArrayDeque<>(data.length);
+		DataInputDeserializer deser = null;
+		
+		for (SerializedCheckpointData checkpoint : data) {
+			byte[] serializedData = checkpoint.getSerializedData();
+			if (deser == null) {
+				deser = new DataInputDeserializer(serializedData, 0, serializedData.length);
+			}
+			else {
+				deser.setBuffer(serializedData, 0, serializedData.length);
+			}
+			
+			final List<T> ids = new ArrayList<>(checkpoint.getNumIds());
+			final int numIds = checkpoint.getNumIds();
+			
+			for (int i = 0; i < numIds; i++) {
+				ids.add(serializer.deserialize(deser));
+			}
+
+			deque.addLast(new Tuple2<Long, List<T>>(checkpoint.checkpointId, ids));
+		}
+		
+		return deque;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index 7aa1ccf..f8b1cfd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -18,22 +18,196 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 
-public enum StateBackend {
-	JOBMANAGER, FILESYSTEM;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * A state backend defines how state is stored and snapshotted during checkpoints.
+ * 
+ * @param <Backend> The type of backend itself. This generic parameter is used to refer to the
+ *                  type of backend when creating state backed by this backend.
+ */
+public abstract class StateBackend<Backend extends StateBackend<Backend>> implements java.io.Serializable {
+	
+	private static final long serialVersionUID = 4620413814639220247L;
+	
+	// ------------------------------------------------------------------------
+	//  initialization and cleanup
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * This method is called by the task upon deployment to initialize the state backend for
+	 * data for a specific job.
+	 * 
+	 * @param job The ID of the job for which the state backend instance checkpoints data.
+	 * @throws Exception Overwritten versions of this method may throw exceptions, in which
+	 *                   case the job that uses the state backend is considered failed during
+	 *                   deployment.
+	 */
+	public abstract void initializeForJob(JobID job) throws Exception;
+
+	/**
+	 * Disposes all state associated with the current job.
+	 * 
+	 * @throws Exception Exceptions may occur during disposal of the state and should be forwarded.
+	 */
+	public abstract void disposeAllStateForCurrentJob() throws Exception;
+
+	/**
+	 * Closes the state backend, releasing all internal resources, but does not delete any persistent
+	 * checkpoint data.
+	 * 
+	 * @throws Exception Exceptions can be forwarded and will be logged by the system
+	 */
+	public abstract void close() throws Exception;
+	
+	// ------------------------------------------------------------------------
+	//  key/value state
+	// ------------------------------------------------------------------------
 
 	/**
-	 * Returns the configured {@link StateBackend}.
+	 * Creates a key/value state backed by this state backend.
+	 * 
+	 * @param keySerializer The serializer for the key.
+	 * @param valueSerializer The serializer for the value.
+	 * @param defaultValue The value that is returned when no other value has been associated with a key, yet.
+	 * @param <K> The type of the key.
+	 * @param <V> The type of the value.
+	 * 
+	 * @return A new key/value state backed by this backend.
+	 * 
+	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
+	 */
+	public abstract <K, V> KvState<K, V, Backend> createKvState(
+			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
+			V defaultValue) throws Exception;
+	
+	
+	// ------------------------------------------------------------------------
+	//  storing state for a checkpoint
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates an output stream that writes into the state of the given checkpoint. When the stream
+	 * is closes, it returns a state handle that can retrieve the state back.
+	 * 
+	 * @param checkpointID The ID of the checkpoint.
+	 * @param timestamp The timestamp of the checkpoint.
+	 * @return An output stream that writes state for the given checkpoint.
+	 * 
+	 * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
+	 */
+	public abstract CheckpointStateOutputStream createCheckpointStateOutputStream(
+			long checkpointID, long timestamp) throws Exception;
+	
+	/**
+	 * Creates a {@link DataOutputView} stream that writes into the state of the given checkpoint.
+	 * When the stream is closes, it returns a state handle that can retrieve the state back.
 	 *
-	 * @param config The config to parse
-	 * @return Configured state backend or {@link ConfigConstants#DEFAULT_RECOVERY_MODE} if not
-	 * configured.
-	 */
-	public static StateBackend fromConfig(Configuration config) {
-		return StateBackend.valueOf(config.getString(
-				ConfigConstants.STATE_BACKEND,
-				ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase());
+	 * @param checkpointID The ID of the checkpoint.
+	 * @param timestamp The timestamp of the checkpoint.
+	 * @return An DataOutputView stream that writes state for the given checkpoint.
+	 *
+	 * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
+	 */
+	public CheckpointStateOutputView createCheckpointStateOutputView(
+			long checkpointID, long timestamp) throws Exception {
+		return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp));
+	}
+
+	/**
+	 * Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
+	 * 
+	 * @param state The state to be checkpointed.
+	 * @param checkpointID The ID of the checkpoint.
+	 * @param timestamp The timestamp of the checkpoint.
+	 * @param <S> The type of the state.
+	 * 
+	 * @return A state handle that can retrieve the checkpoined state.
+	 * 
+	 * @throws Exception Exceptions may occur during serialization / storing the state and should be forwarded.
+	 */
+	public abstract <S extends Serializable> StateHandle<S> checkpointStateSerializable(
+			S state, long checkpointID, long timestamp) throws Exception;
+	
+	
+	// ------------------------------------------------------------------------
+	//  Checkpoint state output stream
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A dedicated output stream that produces a {@link StreamStateHandle} when closed.
+	 */
+	public static abstract class CheckpointStateOutputStream extends OutputStream {
+
+		/**
+		 * Closes the stream and gets a state handle that can create an input stream
+		 * producing the data written to this stream.
+		 * 
+		 * @return A state handle that can create an input stream producing the data written to this stream.
+		 * @throws IOException Thrown, if the stream cannot be closed.
+		 */
+		public abstract StreamStateHandle closeAndGetHandle() throws IOException;
+	}
+
+	/**
+	 * A dedicated DataOutputView stream that produces a {@code StateHandle<DataInputView>} when closed.
+	 */
+	public static final class CheckpointStateOutputView extends DataOutputViewStreamWrapper {
+		
+		private final CheckpointStateOutputStream out;
+		
+		public CheckpointStateOutputView(CheckpointStateOutputStream out) {
+			super(out);
+			this.out = out;
+		}
+
+		/**
+		 * Closes the stream and gets a state handle that can create a DataInputView.
+		 * producing the data written to this stream.
+		 *
+		 * @return A state handle that can create an input stream producing the data written to this stream.
+		 * @throws IOException Thrown, if the stream cannot be closed.
+		 */
+		public StateHandle<DataInputView> closeAndGetHandle() throws IOException {
+			return new DataInputViewHandle(out.closeAndGetHandle());
+		}
+
+		@Override
+		public void close() throws IOException {
+			out.close();
+		}
+	}
+
+	/**
+	 * Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle.
+	 */
+	private static final class DataInputViewHandle implements StateHandle<DataInputView> {
+
+		private static final long serialVersionUID = 2891559813513532079L;
+		
+		private final StreamStateHandle stream;
+
+		private DataInputViewHandle(StreamStateHandle stream) {
+			this.stream = stream;
+		}
+
+		@Override
+		public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
+			return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader)); 
+		}
+
+		@Override
+		public void discardState() throws Exception {
+			stream.discardState();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
new file mode 100644
index 0000000..5b622eb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * A factory to create a specific state backend. The state backend creation gets a Configuration
+ * object that can be used to read further config values.
+ * 
+ * @param <T> The type of the state backend created.
+ */
+public interface StateBackendFactory<T extends StateBackend<T>> {
+
+	/**
+	 * Creates the state backend, optionally using the given configuration.
+	 * 
+	 * @param config The Flink configuration (loaded by the TaskManager).
+	 * @return The created state backend. 
+	 * 
+	 * @throws Exception Exceptions during instantiation can be forwarded.
+	 */
+	StateBackend<T> createFromConfig(Configuration config) throws Exception;
+}