You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/31 13:03:25 UTC

[1/6] flink git commit: [FLINK-4820] [core] Make Flink code independent of log4j 1.x

Repository: flink
Updated Branches:
  refs/heads/master 294c082e8 -> f3930510c


[FLINK-4820] [core] Make Flink code independent of log4j 1.x


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/478f5d18
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/478f5d18
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/478f5d18

Branch: refs/heads/master
Commit: 478f5d1893e292ad68c2702cdae95771678dbdc2
Parents: 294c082
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 16 22:13:17 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 31 14:02:43 2017 +0100

----------------------------------------------------------------------
 .../flink/util/MavenForkNumberPrefixLayout.java | 72 --------------------
 .../src/test/resources/log4j-test.properties    |  2 +-
 .../src/test/resources/log4j-test.properties    |  2 +-
 .../src/test/resources/log4j-test.properties    |  2 +-
 .../src/test/resources/log4j-test.properties    |  2 +-
 tools/log4j-travis.properties                   |  2 +-
 6 files changed, 5 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/478f5d18/flink-core/src/main/java/org/apache/flink/util/MavenForkNumberPrefixLayout.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MavenForkNumberPrefixLayout.java b/flink-core/src/main/java/org/apache/flink/util/MavenForkNumberPrefixLayout.java
deleted file mode 100644
index c6f9012..0000000
--- a/flink-core/src/main/java/org/apache/flink/util/MavenForkNumberPrefixLayout.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.util;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.log4j.PatternLayout;
-import org.apache.log4j.spi.LoggingEvent;
-
-/**
- * The logging layout used to prefix each log event with the Maven fork number.
- * <p>
- * Use this layout when running tests via Maven in parallel and logging to the Console. When logging
- * to a file, you can use a separate file for each fork.
- */
-@Internal
-public class MavenForkNumberPrefixLayout extends PatternLayout {
-
-	/** Property name used to set fork number of the forked JVM. */
-	private static final String PROPERTY = "mvn.forkNumber";
-
-	private final int prefixLength;
-
-	private final StringBuilder stringBuilder;
-
-	public MavenForkNumberPrefixLayout() {
-		super();
-
-		String prefix = System.getProperty(PROPERTY);
-
-		if (prefix != null) {
-			prefix += " > ";
-
-			prefixLength = prefix.length();
-
-			stringBuilder = new StringBuilder(512);
-			stringBuilder.append(prefix);
-		}
-		else {
-			prefixLength = 0;
-			stringBuilder = null;
-		}
-	}
-
-	@Override
-	public String format(LoggingEvent event) {
-		if (prefixLength == 0) {
-			return super.format(event);
-		}
-
-		stringBuilder.setLength(prefixLength);
-
-		stringBuilder.append(super.format(event));
-
-		return stringBuilder.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/478f5d18/flink-core/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/log4j-test.properties b/flink-core/src/test/resources/log4j-test.properties
index 4c74d85..2be3589 100644
--- a/flink-core/src/test/resources/log4j-test.properties
+++ b/flink-core/src/test/resources/log4j-test.properties
@@ -20,7 +20,7 @@
 # set manually to INFO for debugging purposes
 log4j.rootLogger=OFF, testlogger
 
-# A1 is set to be a ConsoleAppender.
+# testlogger is set to be a ConsoleAppender.
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target = System.err
 log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout

http://git-wip-us.apache.org/repos/asf/flink/blob/478f5d18/flink-libraries/flink-ml/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-ml/src/test/resources/log4j-test.properties b/flink-libraries/flink-ml/src/test/resources/log4j-test.properties
index 023b23a..e2ec1a5 100644
--- a/flink-libraries/flink-ml/src/test/resources/log4j-test.properties
+++ b/flink-libraries/flink-ml/src/test/resources/log4j-test.properties
@@ -22,7 +22,7 @@ log4j.rootLogger=OFF, console
 # Console (use 'console')
 # -----------------------------------------------------------------------------
 log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.flink.util.MavenForkNumberPrefixLayout
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
 
 # -----------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/478f5d18/flink-runtime-web/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/resources/log4j-test.properties b/flink-runtime-web/src/test/resources/log4j-test.properties
index 7575897..18b51cc 100644
--- a/flink-runtime-web/src/test/resources/log4j-test.properties
+++ b/flink-runtime-web/src/test/resources/log4j-test.properties
@@ -22,7 +22,7 @@ log4j.rootLogger=OFF, console
 # Console (use 'console')
 # -----------------------------------------------------------------------------
 log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.flink.util.MavenForkNumberPrefixLayout
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
 
 # -----------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/478f5d18/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 662f112..7ba1633 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -22,7 +22,7 @@ log4j.rootLogger=OFF, console
 # Console (use 'console')
 # -----------------------------------------------------------------------------
 log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.flink.util.MavenForkNumberPrefixLayout
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
 
 # -----------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/478f5d18/tools/log4j-travis.properties
----------------------------------------------------------------------
diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties
index 476cee3..4bc8ad0 100644
--- a/tools/log4j-travis.properties
+++ b/tools/log4j-travis.properties
@@ -22,7 +22,7 @@ log4j.rootLogger=INFO, file
 # Console (use 'console')
 # -----------------------------------------------------------------------------
 log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.flink.util.MavenForkNumberPrefixLayout
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
 
 # -----------------------------------------------------------------------------


[4/6] flink git commit: [FLINK-5684] [web frontend] Add MacOS section to README.md

Posted by se...@apache.org.
[FLINK-5684] [web frontend] Add MacOS section to README.md

This closes #3234


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2a11ddf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2a11ddf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2a11ddf

Branch: refs/heads/master
Commit: a2a11ddf5b219221606b7f1a2e65020a9b146600
Parents: 478f5d1
Author: Ivan Mushketyk <iv...@gmail.com>
Authored: Sun Jan 29 17:03:02 2017 +0000
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 31 14:02:44 2017 +0100

----------------------------------------------------------------------
 flink-runtime-web/README.md | 34 ++++++++++++++++++++++++++++++++--
 1 file changed, 32 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a2a11ddf/flink-runtime-web/README.md
----------------------------------------------------------------------
diff --git a/flink-runtime-web/README.md b/flink-runtime-web/README.md
index 5f27077..d29a5e2 100644
--- a/flink-runtime-web/README.md
+++ b/flink-runtime-web/README.md
@@ -60,18 +60,48 @@ Install *node.js* by following [these instructions](https://github.com/joyent/no
 
 Verify that the installed version is at least *2.11.3*, via `npm -version`.
 
+Install *bower* via:
 
-Install *bower* via
 ```
 sudo npm install -g bower
 ```
+
+Verify that the installed version is at least *1.4.1*, via `bower -version`.
+
+
+Install *gulp* via:
+
+```
+sudo npm install -g gulp
+```
+
+Verify that the installed version is at least *3.9.0*, via `gulp -version`.
+
+
+#### MacOS
+
+First install *brew* by following [these instructions](http://brew.sh/).
+
+Install *node.js* via:
+
+```
+brew install node
+```
+
+Install *bower* via:
+
+```
+sudo npm install -g bower
+```
+
 Verify that the installed version is at least *1.4.1*, via `bower -version`.
 
+Install *gulp* via:
 
-Install *gulp* via
 ```
 sudo npm install -g gulp
 ```
+
 Verify that the installed version is at least *3.9.0*, via `gulp -version`.
 
 


[2/6] flink git commit: [FLINK-5644] [runtime] Remove metric: Task#lastCheckpointSize

Posted by se...@apache.org.
[FLINK-5644] [runtime] Remove metric: Task#lastCheckpointSize

This closes #3214


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3930510
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3930510
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3930510

Branch: refs/heads/master
Commit: f3930510c6f54a23f532b8f79aed16ee140e3ce5
Parents: 99aed0b
Author: zentol <ch...@apache.org>
Authored: Wed Jan 25 14:59:07 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 31 14:02:44 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/streaming/runtime/tasks/StreamTask.java  | 10 ----------
 1 file changed, 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f3930510/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 705dfca..63843bb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.CancelTaskException;
@@ -174,8 +173,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	/** Flag to mark this task as canceled */
 	private volatile boolean canceled;
 
-	private long lastCheckpointSize = 0;
-
 	/** Thread pool for async snapshot workers */
 	private ExecutorService asyncOperationsThreadPool;
 
@@ -236,13 +233,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			operatorChain = new OperatorChain<>(this);
 			headOperator = operatorChain.getHeadOperator();
 
-			getEnvironment().getMetricGroup().gauge("lastCheckpointSize", new Gauge<Long>() {
-				@Override
-				public Long getValue() {
-					return StreamTask.this.lastCheckpointSize;
-				}
-			});
-
 			// task specific initialization
 			init();
 


[5/6] flink git commit: [hotfix] [rat] Add exclusion for rolling-sink snapshot

Posted by se...@apache.org.
[hotfix] [rat] Add exclusion for rolling-sink snapshot

This closes #3208


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99aed0b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99aed0b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99aed0b5

Branch: refs/heads/master
Commit: 99aed0b5547dd2315b2e53c7b8405980701a787c
Parents: a459b0b
Author: zentol <ch...@apache.org>
Authored: Wed Jan 25 13:50:17 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 31 14:02:44 2017 +0100

----------------------------------------------------------------------
 pom.xml | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/99aed0b5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2279b0b..5487d70 100644
--- a/pom.xml
+++ b/pom.xml
@@ -887,6 +887,7 @@ under the License.
 						<exclude>flink-libraries/flink-table/src/test/scala/resources/*.out</exclude>
 
 						<!-- snapshots -->
+						<exclude>flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot</exclude>
 						<exclude>flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot</exclude>
 						<exclude>flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.1-snapshot-empty-state</exclude>
 						<exclude>flink-fs-tests/src/test/resources/monitoring-function-migration-test-1482144479339-flink1.1-snapshot</exclude>


[6/6] flink git commit: [FLINK-5563] [gelly] Add density to vertex metrics

Posted by se...@apache.org.
[FLINK-5563] [gelly] Add density to vertex metrics

This closes #3167


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b2620f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b2620f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b2620f7

Branch: refs/heads/master
Commit: 8b2620f70d3b73c82b95a8a3dd73605a0d9cf29e
Parents: a2a11dd
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Jan 18 13:23:13 2017 -0500
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 31 14:02:44 2017 +0100

----------------------------------------------------------------------
 .../library/metric/directed/VertexMetrics.java  | 20 ++++++++++++++--
 .../metric/undirected/VertexMetrics.java        | 24 +++++++++++++++++---
 .../org/apache/flink/graph/asm/AsmTestBase.java | 11 +++++++++
 .../metric/directed/VertexMetricsTest.java      |  8 ++++++-
 .../metric/undirected/VertexMetricsTest.java    |  8 ++++++-
 5 files changed, 64 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8b2620f7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
index c90423b..231631b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
@@ -251,12 +251,27 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		/**
-		 * Get the average degree.
+		 * Get the average degree, the average number of in- plus out-edges per vertex.
+		 *
+		 * A result of {@code Float.NaN} is returned for an empty graph for
+		 * which both the number of edges and number of vertices is zero.
 		 *
 		 * @return average degree
 		 */
 		public float getAverageDegree() {
-			return getNumberOfEdges() / (float)vertexCount;
+			return vertexCount == 0 ? Float.NaN : getNumberOfEdges() / (float)vertexCount;
+		}
+
+		/**
+		 * Get the density, the ratio of actual to potential edges between vertices.
+		 *
+		 * A result of {@code Float.NaN} is returned for a graph with fewer than
+		 * two vertices for which the number of edges is zero.
+		 *
+		 * @return density
+		 */
+		public float getDensity() {
+			return vertexCount <= 1 ? Float.NaN : getNumberOfEdges() / (float)(vertexCount*(vertexCount-1));
 		}
 
 		/**
@@ -313,6 +328,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 				+ "; unidirectional edge count: " + nf.format(unidirectionalEdgeCount)
 				+ "; bidirectional edge count: " + nf.format(bidirectionalEdgeCount)
 				+ "; average degree: " + nf.format(getAverageDegree())
+				+ "; density: " + nf.format(getDensity())
 				+ "; triplet count: " + nf.format(tripletCount)
 				+ "; maximum degree: " + nf.format(maximumDegree)
 				+ "; maximum out degree: " + nf.format(maximumOutDegree)

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2620f7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
index 5af7cf6..0fd1428 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -208,7 +208,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		/**
-		 * Get the number of edges.
+		 * Get the number of edges. Each edge is counted once even though Gelly
+		 * stores undirected edges twice, once in each direction.
 		 *
 		 * @return number of edges
 		 */
@@ -217,12 +218,28 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		/**
-		 * Get the average degree.
+		 * Get the average degree, the average number of edges per vertex.
+		 *
+		 * A result of {@code Float.NaN} is returned for an empty graph for
+		 * which both the number of edges and number of vertices is zero.
 		 *
 		 * @return average degree
 		 */
 		public float getAverageDegree() {
-			return edgeCount / (float)vertexCount;
+			// each edge is incident on two vertices
+			return vertexCount == 0 ? Float.NaN : 2 * edgeCount / (float)vertexCount;
+		}
+
+		/**
+		 * Get the density, the ratio of actual to potential edges between vertices.
+		 *
+		 * A result of {@code Float.NaN} is returned for a graph with fewer than
+		 * two vertices for which the number of edges is zero.
+		 *
+		 * @return density
+		 */
+		public float getDensity() {
+			return vertexCount <= 1 ? Float.NaN : edgeCount / (float)(vertexCount*(vertexCount-1)/2);
 		}
 
 		/**
@@ -259,6 +276,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 			return "vertex count: " + nf.format(vertexCount)
 				+ "; edge count: " + nf.format(edgeCount)
 				+ "; average degree: " + nf.format(getAverageDegree())
+				+ "; density: " + nf.format(getDensity())
 				+ "; triplet count: " + nf.format(tripletCount)
 				+ "; maximum degree: " + nf.format(maximumDegree)
 				+ "; maximum triplets: " + nf.format(maximumTriplets);

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2620f7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index 14c2da4..b057121 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -37,6 +37,8 @@ public class AsmTestBase {
 
 	protected ExecutionEnvironment env;
 
+	protected final double ACCURACY = 0.000001;
+
 	// simple graph
 	protected Graph<IntValue, NullValue, NullValue> directedSimpleGraph;
 
@@ -61,6 +63,7 @@ public class AsmTestBase {
 	public void setup()
 			throws Exception {
 		env = ExecutionEnvironment.createCollectionsEnvironment();
+		env.getConfig().enableObjectReuse();
 
 		// the "fish" graph
 		Object[][] edges = new Object[][] {
@@ -98,9 +101,17 @@ public class AsmTestBase {
 		Graph<LongValue, NullValue, NullValue> rmatGraph = new RMatGraph<>(env, new JDKRandomGeneratorFactory(), rmatVertexCount, rmatEdgeCount)
 			.generate();
 
+		/*
+			./bin/flink run -c org.apache.flink.graph.drivers.Graph500 flink-gelly-examples_2.10-1.2-SNAPSHOT.jar \
+				--directed true --simplify true --scale 10 --edge_factor 16 --output csv --filename directedRMatGraph.csv
+		 */
 		directedRMatGraph = rmatGraph
 			.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
 
+		/*
+			./bin/flink run -c org.apache.flink.graph.drivers.Graph500 flink-gelly-examples_2.10-1.2-SNAPSHOT.jar \
+				--directed false --simplify true --scale 10 --edge_factor 16 --output csv --filename undirectedRMatGraph.csv
+		 */
 		undirectedRMatGraph = rmatGraph
 			.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2620f7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
index c4ec8f8..eb25816 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
@@ -59,6 +59,8 @@ extends AsmTestBase {
 			.execute();
 
 		assertEquals(expectedResult, vertexMetrics);
+		assertEquals(expectedDegree, vertexMetrics.getAverageDegree(), ACCURACY);
+		assertEquals(1.0f, vertexMetrics.getDensity(), ACCURACY);
 	}
 
 	@Test
@@ -73,7 +75,9 @@ extends AsmTestBase {
 			.run(emptyGraph)
 			.execute();
 
-		assertEquals(withoutZeroDegreeVertices, expectedResult);
+		assertEquals(expectedResult, withoutZeroDegreeVertices);
+		assertEquals(Float.NaN, withoutZeroDegreeVertices.getAverageDegree(), ACCURACY);
+		assertEquals(Float.NaN, withoutZeroDegreeVertices.getDensity(), ACCURACY);
 
 		expectedResult = new Result(3, 0, 0, 0, 0, 0, 0, 0);
 
@@ -83,6 +87,8 @@ extends AsmTestBase {
 			.execute();
 
 		assertEquals(expectedResult, withZeroDegreeVertices);
+		assertEquals(0.0f, withZeroDegreeVertices.getAverageDegree(), ACCURACY);
+		assertEquals(0.0f, withZeroDegreeVertices.getDensity(), ACCURACY);
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2620f7/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
index 8f7e1da..6eecc42 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
@@ -59,6 +59,8 @@ extends AsmTestBase {
 			.execute();
 
 		assertEquals(expectedResult, vertexMetrics);
+		assertEquals(expectedDegree, vertexMetrics.getAverageDegree(), ACCURACY);
+		assertEquals(1.0f, vertexMetrics.getDensity(), ACCURACY);
 	}
 
 	@Test
@@ -73,7 +75,9 @@ extends AsmTestBase {
 			.run(emptyGraph)
 			.execute();
 
-		assertEquals(withoutZeroDegreeVertices, expectedResult);
+		assertEquals(expectedResult, withoutZeroDegreeVertices);
+		assertEquals(Float.NaN, withoutZeroDegreeVertices.getAverageDegree(), ACCURACY);
+		assertEquals(Float.NaN, withoutZeroDegreeVertices.getDensity(), ACCURACY);
 
 		expectedResult = new Result(3, 0, 0, 0, 0);
 
@@ -83,6 +87,8 @@ extends AsmTestBase {
 			.execute();
 
 		assertEquals(expectedResult, withZeroDegreeVertices);
+		assertEquals(0.0f, withZeroDegreeVertices.getAverageDegree(), ACCURACY);
+		assertEquals(0.0f, withZeroDegreeVertices.getDensity(), ACCURACY);
 	}
 
 	@Test


[3/6] flink git commit: [FLINK-5581] [doc] Improve user accessibility for Kerberos-related documentation

Posted by se...@apache.org.
[FLINK-5581] [doc] Improve user accessibility for Kerberos-related documentation

This closes #3181


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a459b0b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a459b0b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a459b0b3

Branch: refs/heads/master
Commit: a459b0b3bde62daabb56939d7ba6f2c8a69a2226
Parents: 8b2620f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Jan 20 16:05:45 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 31 14:02:44 2017 +0100

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                  | 21 ++++++
 docs/ops/security-kerberos.md                 | 52 ++++---------
 docs/setup/config.md                          | 85 ++++++++++++----------
 docs/setup/jobmanager_high_availability.md    | 14 ++++
 flink-dist/src/main/resources/flink-conf.yaml | 19 +++--
 5 files changed, 107 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a459b0b3/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index cc51071..6a58b7a 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -353,3 +353,24 @@ The offsets committed to ZK or the broker can also be used to track the read pro
 the committed offset and the most recent offset in each partition is called the *consumer lag*. If the Flink topology is consuming
 the data slower from the topic than new data is added, the lag will increase and the consumer will fall behind.
 For large production deployments we recommend monitoring that metric to avoid increasing latency.
+
+### Enabling Kerberos Authentication (for versions 0.9+ and above only)
+
+Flink provides first-class support through the Kafka connector to authenticate to a Kafka installation
+configured for Kerberos. Simply configure Flink in `flink-conf.yaml` to enable Kerberos authentication for Kafka like so:
+
+1. Configure Kerberos credentials by setting the following -
+ - `security.kerberos.login.use-ticket-cache`: By default, this is `true` and Flink will attempt to use Kerberos credentials in ticket caches managed by `kinit`. 
+ Note that when using the Kafka connector in Flink jobs deployed on YARN, Kerberos authorization using ticket caches will not work. This is also the case when deploying using Mesos, as authorization using ticket cache is not supported for Mesos deployments. 
+ - `security.kerberos.login.keytab` and `security.kerberos.login.principal`: To use Kerberos keytabs instead, set values for both of these properties.
+ 
+2. Append `KafkaClient` to `security.kerberos.login.contexts`: This tells Flink to provide the configured Kerberos credentials to the Kafka login context to be used for Kafka authentication.
+
+Once Kerberos-based Flink security is enabled, you can authenticate to Kafka with either the Flink Kafka Consumer or Producer by simply including the following two settings in the provided properties configuration that is passed to the internal Kafka client:
+
+- Set `security.protocol` to `SASL_PLAINTEXT` (default `NONE`): The protocol used to communicate to Kafka brokers.
+When using standalone Flink deployment, you can also use `SASL_SSL`; please see how to configure the Kafka client for SSL [here](https://kafka.apache.org/documentation/#security_configclients). 
+- Set `sasl.kerberos.service.name` to `kafka` (default `kafka`): The value for this should match the `sasl.kerberos.service.name` used for Kafka broker configurations. A mismatch in service name between client and server configuration will cause the authentication to fail.
+
+For more information on Flink configuration for Kerberos security, please see [here]({{ site.baseurl}}/setup/config.html).
+You can also find [here]({{ site.baseurl}}/ops/security-kerberos.html) further details on how Flink internally setups Kerberos-based security.

http://git-wip-us.apache.org/repos/asf/flink/blob/a459b0b3/docs/ops/security-kerberos.md
----------------------------------------------------------------------
diff --git a/docs/ops/security-kerberos.md b/docs/ops/security-kerberos.md
index 2afe760..3e5cad9 100644
--- a/docs/ops/security-kerberos.md
+++ b/docs/ops/security-kerberos.md
@@ -28,6 +28,7 @@ filesystems, connectors, and state backends.
 
 ## Objective
 The primary goals of the Flink Kerberos security infrastructure are:
+
 1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka)
 2. to authenticate to ZooKeeper (if configured to use SASL)
 3. to authenticate to Hadoop components (e.g. HDFS, HBase) 
@@ -36,14 +37,14 @@ In a production deployment scenario, streaming jobs are understood to run for lo
 data sources throughout the life of the job.  Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token
 or ticket cache entry.
 
-The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential
+The current implementation supports running Flink clusters (JobManager / TaskManager / jobs) with either a configured keytab credential
 or with Hadoop delegation tokens.   Keep in mind that all jobs share the credential configured for a given cluster.   To use a different keytab
 for for a certain job, simply launch a separate Flink cluster with a different configuration.   Numerous Flink clusters may run side-by-side in a YARN
 or Mesos environment.
 
 ## How Flink Security works
 In concept, a Flink program may use first- or third-party connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary authentication methods (Kerberos, SSL/TLS, username/password, etc.).  While satisfying the security requirements for all connectors is an ongoing effort,
-Flink provides first-class support for Kerberos authentication only.  The following services and connectors are tested for Kerberos authentication:
+Flink provides first-class support for Kerberos authentication only.  The following services and connectors are supported for Kerberos authentication:
 
 - Kafka (0.9+)
 - HDFS
@@ -55,7 +56,7 @@ Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice
 Kerbreros credentials, which is then explicitly used by each component.
 
 The internal architecture is based on security modules (implementing `org.apache.flink.runtime.security.modules.SecurityModule`) which
-are installed at startup.  The next section describes each security module.
+are installed at startup.  The following sections describes each security module.
 
 ### Hadoop Security Module
 This module uses the Hadoop `UserGroupInformation` (UGI) class to establish a process-wide *login user* context.   The login user is
@@ -75,51 +76,22 @@ dynamic entries provided by this module.
 This module configures certain process-wide ZooKeeper security-related settings, namely the ZooKeeper service name (default: `zookeeper`)
 and the JAAS login context name (default: `Client`).
 
-## Security Configuration
-
-### Flink Configuration
-The user's Kerberos ticket cache (managed with `kinit`) is used automatically, based on the following configuration option:
-
-- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from the user's Kerberos ticket cache (default: `true`).
-
-A Kerberos keytab can be supplied by adding below configuration elements to the Flink configuration file:
-
-- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
-
-- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
-
-These configuration options establish a cluster-wide credential to be used in a Hadoop and/or JAAS context.  Whether the credential is used in a Hadoop context is based on the Hadoop configuration (see next section).   To be used in a JAAS context, the configuration specifies which JAAS *login contexts* (or *applications*) are enabled with the following configuration option:
-
-- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client` to use the credentials for ZooKeeper authentication).
-
-ZooKeeper-related configuration overrides:
-
-- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`). Facilitates mutual-authentication between the client (Flink) and server.
-
-- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match
-one of the values specified in `security.kerberos.login.contexts`.
-
-### Hadoop Configuration
-
-The Hadoop configuration is located via the `HADOOP_CONF_DIR` environment variable and by other means (see `org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils`).   The Kerberos credential (configured above) is used automatically if Hadoop security is enabled.
-
-Note that Kerberos credentials found in the ticket cache aren't transferrable to other hosts.   In this scenario, the Flink CLI acquires Hadoop
-delegation tokens (for HDFS and for HBase).
-
 ## Deployment Modes
 Here is some information specific to each deployment mode.
 
 ### Standalone Mode
 
 Steps to run a secure Flink cluster in standalone/cluster mode:
-1. Add security-related configuration options to the Flink configuration file (on all cluster nodes).
+
+1. Add security-related configuration options to the Flink configuration file (on all cluster nodes) (see [here]({{site.baseurl}}/setup/config.html#kerberos-based-security)).
 2. Ensure that the keytab file exists at the path indicated by `security.kerberos.login.keytab` on all cluster nodes.
 3. Deploy Flink cluster as normal.
 
 ### YARN/Mesos Mode
 
 Steps to run a secure Flink cluster in YARN/Mesos mode:
-1. Add security-related configuration options to the Flink configuration file on the client.
+
+1. Add security-related configuration options to the Flink configuration file on the client (see [here]({{site.baseurl}}/setup/config.html#kerberos-based-security)).
 2. Ensure that the keytab file exists at the path as indicated by `security.kerberos.login.keytab` on the client node.
 3. Deploy Flink cluster as normal.
 
@@ -130,15 +102,17 @@ For more information, see <a href="https://github.com/apache/hadoop/blob/trunk/h
 #### Using `kinit` (YARN only)
 
 In YARN mode, it is possible to deploy a secure Flink cluster without a keytab, using only the ticket cache (as managed by `kinit`).
-This avoids the complexity of generating a keytab and avoids entrusting the cluster manager with it.  The main drawback is
-that the cluster is necessarily short-lived since the generated delegation tokens will expire (typically within a week).
+This avoids the complexity of generating a keytab and avoids entrusting the cluster manager with it.  In this scenario, the Flink CLI acquires Hadoop delegation tokens (for HDFS and for HBase).
+The main drawback is that the cluster is necessarily short-lived since the generated delegation tokens will expire (typically within a week).
 
 Steps to run a secure Flink cluster using `kinit`:
-1. Add security-related configuration options to the Flink configuration file on the client.
+
+1. Add security-related configuration options to the Flink configuration file on the client (see [here]({{site.baseurl}}/setup/config.html#kerberos-based-security)).
 2. Login using the `kinit` command.
 3. Deploy Flink cluster as normal.
 
 ## Further Details
+
 ### Ticket Renewal
 Each component that uses Kerberos is independently responsible for renewing the Kerberos ticket-granting-ticket (TGT).
 Hadoop, ZooKeeper, and Kafka all renew the TGT automatically when provided a keytab.  In the delegation token scenario,

http://git-wip-us.apache.org/repos/asf/flink/blob/a459b0b3/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 1a72e27..269633c 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -100,25 +100,15 @@ These options are useful for debugging a Flink application for memory and garbag
 
 Flink supports Kerberos authentication for the following services:
 
-+ Hadoop Components (such as HDFS, YARN, or HBase)
-+ Kafka Connectors (version 0.9+)
++ Hadoop Components, such as HDFS, YARN, or HBase *(version 2.6.1 and above; all other versions have critical bugs which might fail the Flink job unexpectedly)*.
++ Kafka Connectors *(version 0.9+ and above)*.
 + Zookeeper
 
-**Kerberos is supported only in Hadoop version 2.6.1 and above. All
-  other versions have critical bugs which might fail the Flink job
-  unexpectedly.**
+Configuring Flink for Kerberos security involves three aspects, explained separately in the following sub-sections.
 
-Configuring Flink for Kerberos security involves three aspects:
+##### 1. Providing the cluster with a Kerberos credential (i.e. a keytab or a ticket via `kinit`)
 
-1. Providing the cluster with a Kerberos credential (i.e. a keytab or a ticket via `kinit`)
-2. Making the Kerberos credential available to components and connectors as needed
-3. Configuring the component and/or connector to use Kerberos authentication
-
-To provide the cluster with a Kerberos credential, either configure the login keytab using the below configuration options,
-or login using `kinit` before starting the cluster.
-
-It is preferable to use keytabs for long-running jobs, to avoid ticket expiration issues.   If you prefer to use the ticket cache,
-talk to your administrator about increasing the Hadoop delegation token lifetime.
+To provide the cluster with a Kerberos credential, Flink supports using a Kerberos keytab file or ticket caches managed by `kinit`.
 
 - `security.kerberos.login.use-ticket-cache`: Indicates whether to read from your Kerberos ticket cache (default: `true`).
 
@@ -126,28 +116,35 @@ talk to your administrator about increasing the Hadoop delegation token lifetime
 
 - `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
 
-If Hadoop security is enabled (in `core-site.xml`), Flink will automatically use the configured Kerberos credentials when connecting to HDFS, HBase, and other Hadoop components.
+If both `security.kerberos.login.keytab` and `security.kerberos.login.principal` have values provided, keytabs will be used for authentication.
+It is preferable to use keytabs for long-running jobs, to avoid ticket expiration issues.   If you prefer to use the ticket cache,
+talk to your administrator about increasing the Hadoop delegation token lifetime.
+
+Note that authentication using ticket caches is only supported when deploying Flink as a standalone cluster or on YARN.
+
+##### 2. Making the Kerberos credential available to components and connectors as needed
+
+For Hadoop components, Flink will automatically detect if the configured Kerberos credentials should be used when connecting to HDFS, HBase, and other Hadoop components depending on whether Hadoop security is enabled (in `core-site.xml`).
 
-Make the Kerberos credentials available to any connector or component that uses a JAAS configuration file by configuring JAAS login contexts.
+For any connector or component that uses a JAAS configuration file, make the Kerberos credentials available to them by configuring JAAS login contexts for each one respectively, using the following configuration:
 
 - `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for Kafka authentication).
 
-You may also provide a static JAAS configuration file, whose entries override those produced by the above configuration option.
+This allows enabling Kerberos authentication for different connectors or components independently. For example, you can enable Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa.
 
-Be sure to configure the connector within your Flink program as necessary to use Kerberos authentication.  For the Kafka connector,
-use the following properties:
+You may also provide a static JAAS configuration file using the mechanisms described in the [Java SE Documentation](http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html), whose entries will override those produced by the above configuration option.
 
-```
-security.protocol=SASL_PLAINTEXT (or SASL_SSL)
-sasl.kerberos.service.name=kafka
-```
+##### 3. Configuring the component and/or connector to use Kerberos authentication
+
+Finally, be sure to configure the connector within your Flink program or component as necessary to use Kerberos authentication.
 
-Flink provides some additional options to configure ZooKeeper security:
+Below is a list of currently first-class supported connectors or components by Flink for Kerberos authentication:
 
-- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`).
+- Kafka: see [here]({{site.baseurl}}/dev/connectors/kafka.html#enabling-kerberos-authentication-for-versions-above-09-only) for details on configuring the Kafka connector to use Kerberos authentication.
 
-- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match
-one of the values specified in `security.kerberos.login.contexts`.
+- Zookeeper (for HA): see [here]({{site.baseurl}}/setup/jobmanager_high_availability.html#configuring-for-zookeeper-security) for details on Zookeeper security configuration to work with the Kerberos-based security configurations mentioned here.
+
+For more information on how Flink security internally setups Kerberos authentication, please see [here]({{site.baseurl}}/ops/security-kerberos.html). 
 
 ### Other
 
@@ -401,7 +398,7 @@ The configuration keys in this section are independent of the used resource mana
 of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration key to define port ranges.
 
 
-## YARN
+### YARN
 
 - `yarn.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to remove from containers started by YARN. When a user requests a certain amount of memory for each TaskManager container (for example 4 GB), we can not pass this amount as the maximum heap space for the JVM (`-Xmx` argument) because the JVM is also allocating memory outside the heap. YARN is very strict with killing containers which are using more memory than requested. Therefore, we remove a 15% of the memory from the requested heap as a safety margin.
 
@@ -435,7 +432,7 @@ use the `env.java.opts` setting, which is the `%jvmopts%` variable in the String
   For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports.
 
 
-## High Availability (HA)
+### High Availability (HA)
 
 - `high-availability`: Defines the high availability mode used for the cluster execution. Currently, Flink supports the following modes:
   - `none` (default): No high availability. A single JobManager runs and no JobManager state is checkpointed.
@@ -443,9 +440,9 @@ use the `env.java.opts` setting, which is the `%jvmopts%` variable in the String
 
 Previously this key was named `recovery.mode` and the default value was `standalone`.
 
-### ZooKeeper-based HA Mode
+#### ZooKeeper-based HA Mode
 
-- `high-availability.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connect to the ZooKeeper cluster when the 'zookeeper' HA mode is selected. Previously this key was name `recovery.zookeeper.quorum`.
+- `high-availability.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connect to the ZooKeeper cluster when the 'zookeeper' HA mode is selected. Previously this key was named `recovery.zookeeper.quorum`.
 
 - `high-availability.zookeeper.path.root`: (Default `/flink`) Defines the root dir under which the ZooKeeper HA mode will create namespace directories. Previously this ket was named `recovery.zookeeper.path.root`.
 
@@ -469,19 +466,29 @@ Previously this key was named `recovery.mode` and the default value was `standal
 
 - `high-availability.zookeeper.client.acl`: (Default `open`) Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be set to "creator" if the ZooKeeper server configuration has the "authProvider" property mapped to use SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos). The ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
 
-### ZooKeeper-Security
+#### ZooKeeper Security
 
-- `zookeeper.sasl.disable`: (Default: `true`) Defines if SASL based authentication needs to be enabled or disabled. The configuration value can be set to "true" if ZooKeeper cluster is running in secure mode (Kerberos)
+- `zookeeper.sasl.disable`: (Default: `true`) Defines if SASL based authentication needs to be enabled or disabled. The configuration value can be set to "true" if ZooKeeper cluster is running in secure mode (Kerberos).
 
 - `zookeeper.sasl.service-name`: (Default: `zookeeper`) If the ZooKeeper server is configured with a different service name (default:"zookeeper") then it can be supplied using this configuration. A mismatch in service name between client and server configuration will cause the authentication to fail.
 
-## Environment
+### Kerberos-based Security
+
+- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from your Kerberos ticket cache (default: `true`).
+
+- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
+
+- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
+
+- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for Kafka authentication).
+
+### Environment
 
 - `env.log.dir`: (Defaults to the `log` directory under Flink's home) Defines the directory where the Flink logs are saved. It has to be an absolute path.
 
-## Queryable State
+### Queryable State
 
-### Server
+#### Server
 
 - `query.server.enable`: Enable queryable state (Default: `true`).
 
@@ -491,7 +498,7 @@ Previously this key was named `recovery.mode` and the default value was `standal
 
 - `query.server.query-threads`: Number of query Threads for queryable state server (Default: `0`, picks number of slots).
 
-### Client
+#### Client
 
 - `query.client.network-threads`: Number of network (Netty's event loop) Threads for queryable state client (Default: `0`, picks number of available cores as returned by `Runtime.getRuntime().availableProcessors()`).
 
@@ -499,7 +506,7 @@ Previously this key was named `recovery.mode` and the default value was `standal
 
 - `query.client.lookup.retry-delay`: Retry delay in milliseconds on KvState lookup failure due to unavailable JobManager (Default: `1000`).
 
-## Metrics
+### Metrics
 
 - `metrics.reporters`: The list of named reporters, i.e. "foo,bar".
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a459b0b3/docs/setup/jobmanager_high_availability.md
----------------------------------------------------------------------
diff --git a/docs/setup/jobmanager_high_availability.md b/docs/setup/jobmanager_high_availability.md
index 9dcc7cc..aa18a4b 100644
--- a/docs/setup/jobmanager_high_availability.md
+++ b/docs/setup/jobmanager_high_availability.md
@@ -211,6 +211,20 @@ Starting zookeeper daemon on host localhost.</pre>
    <pre>
 $ bin/yarn-session.sh -n 2</pre>
 
+## Configuring for Zookeeper Security
+
+If ZooKeeper is running in secure mode with Kerberos, you can override the following configurations in `flink-conf.yaml` as necessary:
+
+<pre>
+zookeeper.sasl.service-name: zookeeper     # default is "zookeeper". If the ZooKeeper quorum is configured
+                                           # with a different service name then it can be supplied here.
+zookeeper.sasl.login-context-name: Client  # default is "Client". The value needs to match one of the values
+                                           # configured in "security.kerberos.login.contexts".
+</pre>
+
+For more information on Flink configuration for Kerberos security, please see [here]({{ site.baseurl}}/setup/config.html).
+You can also find [here]({{ site.baseurl}}/ops/security-kerberos.html) further details on how Flink internally setups Kerberos-based security.
+
 ## Bootstrap ZooKeeper
 
 If you don't have a running ZooKeeper installation, you can use the helper scripts, which ship with Flink.

http://git-wip-us.apache.org/repos/asf/flink/blob/a459b0b3/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 108bd58..0f30595 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -89,7 +89,7 @@ jobmanager.web.port: 8081
 #
 # Supported backends: jobmanager, filesystem, rocksdb, <class-name-of-factory>
 #
-#state.backend: filesystem
+# state.backend: filesystem
 
 
 # Directory for storing checkpoints in a Flink-supported filesystem
@@ -169,11 +169,16 @@ jobmanager.web.port: 8081
 # 3. make the credentials available to various JAAS login contexts
 # 4. configure the connector to use JAAS/SASL
 
-#security.kerberos.login.keytab: /path/to/kerberos/keytab
-#security.kerberos.login.principal: flink-user
-#security.kerberos.login.use-ticket-cache: true
+# The below configure how Kerberos credentials are provided. A keytab will be used instead of
+# a ticket cache if the keytab path and principal are set.
 
-#security.kerberos.login.contexts: Client,KafkaClient
+# security.kerberos.login.use-ticket-cache: true
+# security.kerberos.login.keytab: /path/to/kerberos/keytab
+# security.kerberos.login.principal: flink-user
+
+# The configuration below defines which JAAS login contexts
+
+# security.kerberos.login.contexts: Client,KafkaClient
 
 #==============================================================================
 # ZK Security Configuration (optional configuration)
@@ -182,5 +187,7 @@ jobmanager.web.port: 8081
 # Below configurations are applicable if ZK ensemble is configured for security
 
 # Override below configuration to provide custom ZK service name if configured
-#
 # zookeeper.sasl.service-name: zookeeper
+
+# The configuration below must match one of the values set in "security.kerberos.login.contexts"
+# zookeeper.sasl.login-context-name: Client